The following table lists the Python SDK modules and classes:
Provides a low-level API for managing a connection to RTM.
You can use the Connection object as long as it stays connected to the RTM. If a disconnect occurs, you must create a new Connection object, resubscribe to all channels, and perform authentication again, if necessary.
Note
The satori.rtm.client module includes a default implementation to handle disconnects automatically and reconnect and resubscribes as necessary.
Constructor for the Connection class. Creates and returns an instance of the Connection class. Use this function to create a instance from which you can subscribe and publish, authenticate an application user, and manage the WebSocket connection to RTM. The Connection class allows you to publish and subscribe synchronously and asynchronously.
The endpoint and appkey parameters are required. Optionally, you can choose to create a delegate to process received messages and handle connection and channel errors. To set the delegate property, specify it in the constructor or use connection.delegate = MyCustomDelegate().
Syntax
... connection = Connection(endpoint, appkey, delegate=None) after_receive = threading.Event() class ConnectionDelegate(object): def on_connection_closed(self): print('connection closed') def on_internal_error(error): print('internal error', error) def on_subscription_data(data): print('data:', data) after_receive.set() connection.delegate = ConnectionDelegate() connection.start()
Closes a WebSocket connection to RTM for the Connection object.
Use this method if you want to explicitly stop all interaction with RTM. After you use this method, you can no longer publish or subscribe to any channels for the Connection object. You must use start() to restart the WebSocket connection and then publish or subscribe.
Publishes a message to the specified channel.
The channel and message parameters are required. The message parameter can be any JSON-supported value. For more information, see www.json.org.
By default, this method does not acknowledge the completion of the publish operation. Optionally, you can specify a callback function to process the response from RTM. If you specify a callback, RTM returns an object that represents the PDU response to the publish request. For more information about PDUs, see RTM API in the online docs.
Because this is an asynchronous method, you can also use the Python threading module to create an event to track completion of the publish operation in the callback function.
Syntax
connection.start() connection.publish("My Channel", "Message text to publish")
Asynchronously reads a value from the specified channel. This function has no return value, but you can inspect the response PDU in the callback function.
You can also use the args parameter to add additional JSON key-value pairs to the PDU in the read request that the SDK sends to RTM. For more information about PDUs, see RTM API in the online docs.
By default, this method does not acknowledge the completion of the subscribe operation. Optionally, you can specify a callback function to process the response from RTM. If you specify a callback, RTM returns an object that represents the response to the publish request as a PDU.
Syntax
connection.start() position = connection.publish_sync(channel, message) connection.subscribe(channel, {'position': position})
Synchronously reads a message from the specified channel.
This method generates a RuntimeError if the read operation does not complete within the timeout period.
Syntax
connection.start() message = 'hello' connection.publish_sync(channel, message) value = connection.read_sync(channel) # value should be "hello" ...
Asynchronously writes a value into the specified channel.
The channel and value parameters are required. The value parameter can be any JSON-supported value. For more information, see www.json.org.
By default, this method does not acknowledge the completion of the publish operation. Optionally, you can specify a callback function to process the response from RTM. If you specify a callback, RTM returns an object that represents the response to the publish request as a PDU. For more information about PDUs, see the RTM API Reference.
Because this is an asynchronous method, you can also use the Python threading module to create an event to track completion of the write operation in the callback function.
Syntax
connection.start() connection.write("my_dog", {"latitude": 52.52, "longitude":13.405})
connection.start() mailbox = [] event = threading.Event() def delete_callback(reply): mailbox.append(reply) event.set() connection.delete("old_stuff", callback=delete_callback) if not event.wait(5): print('Delete request timed out') else: print('Delete request returned {0}'.format(mailbox[0]))
Synchronously publishes a message to the specified channel and returns the position property for the message stream position to which the message was published. For more information about the position value, see RTM API in the online docs.
This method generates a RuntimeError if the publish operation does not complete within the timeout period.
The message parameter can be any JSON-supported value. For more information, see www.json.org.
Note
To send a publish request asynchronously for a Connection object, use publish(channel, message, callback).
Syntax
connection.start() position = connection.publish_sync(channel, message) connection.subscribe_sync(channel, {'position': position}) ...
Subscribes to the specified channel.
You can use the args parameter to add additional JSON values to the Protocol Data Unit (PDU) in the subscribe request that the SDK sends to RTM. For more information about PDUs, see RTM API in the online docs.
By default, this method does not acknowledge the completion of the subscribe operation. Optionally, you can specify a callback function to process the response from RTM. If you specify a callback, RTM returns an object that represents the PDU response to the publish request.
Note
To receive data published to a channel after you subscribe to it, use the on_subscription_data() callback function in a subscription observer class.
Syntax
connection.start() position = connection.publish_sync(channel, message) connection.subscribe(channel, {'position': position})
Subscribes to the specified channel and generates a RuntimeError if the request does not complete within the timeout period.
You can use the args parameter to add additional JSON values to the PDU in the subscribe request that the SDK sends to RTM. For more information about PDUs, see RTM API in the online docs.
Syntax
... connection.start() position = connection.publish_sync(channel, message) connection.subscribe_sync(channel, {'position': position}) ...
Unsubscribes from the specified channel.
After you unsubscribe, the application no longer receives messages for the channel until after RTM completes the unsubscribe operation.
By default, this method does not acknowledge the completion of the subscribe operation. Optionally, you can specify a callback function to process the response from RTM. If you specify a callback, RTM returns an object that represents the PDU response to the publish request. For more information about PDUs, see RTM API in the online docs.
Syntax
... connection.start() position = connection.publish_sync(channel, message) connection.subscribe(channel, {'position': position}) ... connection.unsubscribe(channel) ...
Syntax
... connection.start() position = connection.publish_sync(channel, message) connection.subscribe_sync(channel, {'position': position}) ... unsubscribe_sync(channel) ...
Asynchronously performs a channel search for a given user-defined prefix. This method passes RTM replies to the callback. RTM may send multiple responses to the same search request: zero or more search result PDUs with an action of rtm/search/data (depending on the results of the search). Each channel found is only sent once.
After the search result PDUs, RTM follows with a positive response PDU: rtm/search/ok. Callback must inspect the reply object passed to the callback for the reply['body']['channels'] list. The callback is called on each response.
Validates the identity of a client after connecting to RTM with the Connection module. After the user authenticates with RTM, the operations that the client can perform depends on the role.
Since the authentication process is an asynchronous operation, the callback function is required. The callback function processes the PDU response from RTM.
For more information about authentication, see Authentication and Authorization in the online docs.
Syntax
secret_key = '<ROLE_SECRET_KEY>' auth_delegate = auth.RoleSecretAuthDelegate('<ROLE>', secret_key) auth_event = threading.Event() def auth_callback(auth_result): if type(auth_result) == auth.Done: auth_event.set()
The satori.rtm.client module is the main entry point to manage the WebSocket connection from the Python SDK to RTM. Use the Client class to create a client instance from which you can publish messages and subscribe to channels.
This class routes messages to respective subscription observers and automatically reconnects and restores the authentication and subscription state if the connection to RTM drops.
This is the documentation for Client class
endpoint {string} [required] - RTM endpoint as a string. Example: "wss://rtm:8443/foo/bar". If port number is omitted, it defaults to 80 for ws:// and 443 for wss://. Available from the Dev Portal.
appkey {string} [required] - Appkey used to access RTM. Available from the Dev Portal.
reconnect_interval {int} [optional] - Time period, in seconds, between reconnection attempts. The timeout period between each successive connection attempt increases, but starts with this value. Use max_reconnect_interval to specify the maximum number of seconds between reconnection attempts. Default is 1.
max_reconnect_interval {int} [optional] - Maximum period of time, in seconds, to wait between reconnection attempts. Default is 300.
fail_count_threshold {int} [optional] - Number of times the SDK should attempt to reconnect if the connection disconnects. Specify any value that resolves to an integer. Default is inf (infinity).
observer {client_observer} [optional] - Instance of a client observer class, used to define functionality based on the state changes of a Client.
Set this property with client.observer or in the make_client(*args, **kwargs) or Client(*args, **kwargs) methods.
restore_auth_on_reconnect {boolean} optional - Whether to restore authentication after reconnects. Default is True.
max_queue_size {int} optional - this parameter limits the amount of concurrent requests in order to avoid 'out of memory' situation. For example is max_queue_size is 10 and the client code sends 11 publish requests so fast that by the time it sends 11th one the reply for the first one has not yet arrived, this 11th call to client.publish will throw the satori.rtm.client.Full exception.
https_proxy (string, int) [optional] - (host, port) tuple for https proxy
Syntax
- ::
from satori.rtm.client import Client
client = Client(endpoint='<ENDPOINT>', appkey=<APP_KEY>) ...
Starts a WebSocket connection to RTM for the Client object. You must call the start() method before you subscribe to a channel using the Client object methods.
If you publish any messages before calling this method, the SDK queues the messages to publish after establishing the WebSocket connection.
with sc.make_client( endpoint=endpoint, appkey=appkey) as client: client.stop() ... client.start() ...
Closes a WebSocket connection to RTM for the Client object.
Use this method if you want to explicitly stop all interaction with RTM. After you use this method, if you call publish or subscribe methods while the client is stopped, the SDK queues the requests and sends them when the client reconnects.
with make_client( endpoint=endpoint, appkey=appkey) as client: ... client.stop() ...
Validates the identity of an application user after connecting to RTM with the Client class. After the user authenticates with RTM, the operations that the client can perform depends on the role.
Since the authentication process is an asynchronous operation, the callback function is required. The callback function processes the PDU response from RTM.
For more information about authentication, see Authentication and Authorization in the online docs.
secret_key = '<ROLE_SECRET_KEY>' auth_delegate = auth.RoleSecretAuthDelegate('<ROLE>', secret_key) auth_event = threading.Event() def auth_callback(auth_result): if type(auth_result) == auth.Done: auth_event.set() client.authenticate(auth_delegate, auth_callback) auth_event.wait()
Publishes a message to the specified channel.
The channel and message parameters are required. The message parameter can be any JSON-supported value. For more information, see www.json.org.
By default, this method does not acknowledge the completion of the publish operation. Optionally, you can specify a callback function to process the response from RTM. If you specify a callback, RTM returns an object that represents the Protocol Data Unit (PDU) response to the publish request. For more information about PDUs, see RTM API in the online docs.
with sc.make_client( endpoint=endpoint, appkey=appkey) as client: ... print('Publishing a message') client.publish(channel=channel, message=message)
Asynchronously reads a value from the specified channel. This function has no return value, but you can inspect the reply PDU in the callback function.
You can also use the args parameter to add additional JSON key-value pairs to the PDU in the read request that the SDK sends to RTM. For more information about PDUs, see RTM API in the online docs.
with make_client(endpoint=endpoint, appkey=appkey) as client: mailbox = [] event = threading.Event() def read_callback(reply): mailbox.append(reply) event.set() client.read(channel, callback=read_callback) if not event.wait(5): print('Read request timed out') else: print('Read request returned {0}'.format(mailbox[0]))
with make_client(endpoint=endpoint, appkey=appkey) as client: mailbox = [] event = threading.Event() def write_callback(reply): mailbox.append(reply) event.set() client.write("answer", 42, callback=write_callback) if not event.wait(5): print('Write request timed out') else: print('Write request returned {0}'.format(mailbox[0]))
with make_client(endpoint=endpoint, appkey=appkey) as client: mailbox = [] event = threading.Event() def delete_callback(reply): mailbox.append(reply) event.set() client.delete("old_stuff", callback=delete_callback) if not event.wait(5): print('Delete request timed out') else: print('Delete request returned {0}'.format(mailbox[0]))
Subscribes to the specified channel.
Optionally, you can also use an observer that implements the subscription callback functions and pass the observer as the subscription_observer parameter. The callback functions represent each possible state for the channel subscription. See Subscription Observer.
You can also use the args parameter to add additional JSON key-value pairs to the PDU in the subscribe request that the SDK sends to RTM. For more information about PDUs, see RTM API in the online docs.
Note
To receive data published to a channel after you subscribe to it, use the on_subscription_data() callback function in a subscription observer.
with make_client( endpoint=endpoint, appkey=appkey) as client: class SubscriptionObserver(object): def on_subscription_data(self, data): for message in data['messages']: print('Client got message {0}'.format(message)) subscription_observer = SubscriptionObserver() client.subscribe( channel, SubscriptionMode.RELIABLE, subscription_observer)
Unsubscribes from a channel.
After you unsubscribe, the application no longer receives messages for the channel. To identify when the unsubscribe operation has completed, use the on_leave_subscribed() callback function of a subscription observer class.
with make_client( endpoint=endpoint, appkey=appkey) as client: ... client.subscribe( "My Channel", SubscriptionMode.RELIABLE, subscription_observer) ... client.unsubscribe("My Channel")
Asynchronously performs a channel search for a given user-defined prefix. This method passes RTM replies to the callback. RTM may send multiple responses to the same search request: zero or more search result PDUs with an action of rtm/search/data (depending on the results of the search). Each channel found is only sent once.
After the search result PDUs, RTM follows with a positive response PDU: rtm/search/ok. Callback must inspect the reply object passed to the callback for the reply['body']['channels'] list. The callback is called on each response.
with sc.make_client( endpoint=platform_endpoint, appkey=platform_appkey) as client: ... if client.is_connected() # do something else: # do something else
The make_client() function is a context manager. Call make_client() using a with statement and the SDK automatically starts the WebSocket connection. The SDK stops and then closes the WebSocket connection when the statement completes or terminates due to an error.
This function takes the same parameters as the Client constructor plus optional auth_delegate.
To use this function, import it from the client module:
`from satori.rtm.client import make_client`
endpoint {string} [required] - RTM endpoint as a string. Example: "wss://rtm:8443/foo/bar". If port number is omitted, it defaults to 80 for ws:// and 443 for wss://. Available from the Dev Portal.
appkey {string} [required] - Appkey used to access RTM. Available from the Dev Portal.
reconnect_interval {int} [optional] - Time period, in seconds, between reconnection attempts. The timeout period between each successive connection attempt increases, but starts with this value. Use max_reconnect_interval to specify the maximum number of seconds between reconnection attempts. Default is 1.
max_reconnect_interval {int} [optional] - Maximum period of time, in seconds, to wait between reconnection attempts. Default is 300.
fail_count_threshold {int} [optional] - Number of times the SDK should attempt to reconnect if the connection disconnects. Specify any value that resolves to an integer. Default is inf (infinity).
observer {client_observer} [optional] - Instance of a client observer class, used to define functionality based on the state changes of a Client.
Set this property with client.observer or in the make_client(*args, **kwargs) or Client(*args, **kwargs) methods.
restore_auth_on_reconnect {boolean} optional - Whether to restore authentication after reconnects. Default is True.
max_queue_size {int} optional - this parameter limits the amount of concurrent requests in order to avoid 'out of memory' situation. For example is max_queue_size is 10 and the client code sends 11 publish requests so fast that by the time it sends 11th one the reply for the first one has not yet arrived, this 11th call to client.publish will throw the satori.rtm.client.Full exception.
auth_delegate {AuthDelegate} [optional] - if auth_delegate parameter is present, the client yielded by make_client will be already authenticated.
import satori.rtm.client as sc endpoint = 'ENDPOINT' appkey = 'APPKEY' with sc.make_client(endpoint=endpoint, appkey=appkey) as client:
Use the client observer callback functions in an observer to implement functionality based on the Client object state changes.
Set this observer with the client.observer property on the Client.
The following table lists the Client object states and the associated callback functions:
Client State | Enter Callback | Exit Callback |
---|---|---|
Awaiting | on_enter_awaiting() | on_leave_awaiting() |
Connecting | on_enter_connecting() | on_leave_connecting() |
Connected | on_enter_connected() | on_leave_connected() |
Stopped | on_enter_stopped() | on_leave_stopped() |
Disposed | on_enter_disposed() | n/a |
The following figure shows an example client observer with implemented callback function:
class ClientObserver(object): def __init__(self): self.connection_attempt_count = 0 def on_enter_connecting(self): self.connection_attempt_count += 1 print('Establishing connection #{0}'.format( self.connection_attempt_count)) client = Client(endpoint='<ENDPOINT>', appkey=None) client.observer = ClientObserver() client.start() client.stop() client.start()
Use callback functions in a subscription observer to implement functionality based on the state changes for a channel subscription. The subscribe(channel, SubscriptionMode.RELIABLE, subscription_observer, args) method takes a subscription observer for the subscription_observer parameter.
Note
Depending on your application, these callbacks are optional, except on_subscription_data. To process received messages, you must implement on_subscription_data(data) callback.
The following table lists a subscription observer subscription states and callback functions:
State | Enter Callback | Exit Callback |
---|---|---|
Subscribing | on_enter_subscribing() | on_leave_subscribing() |
Subscribed | on_enter_subscribed() | on_leave_subscribed() |
Unsubscribing | on_enter_unsubscribing() | on_leave_unsubscribing() |
Unsubscribed | on_enter_unsubscribed() | on_leave_unsubscribed() |
Failed | on_enter_failed() | on_leave_failed() |
Deleted | on_deleted() | n/a |
Other Callbacks
Event | Callback |
---|---|
Created | on_created() |
Message(s) Received | on_subscription_data() |
The following figure shows an example subscription observer with an implemented callback function:
class SubscriptionObserver(object): def __init__(self, channel): self.message_count = 0 self.channel = channel def on_subscription_data(self, data): for message in data['messages']: print('Got message {0}'.format(message)) self.message_count += len(data['messages']) def on_enter_subscribed(self): print('Subscription is now active') def on_deleted(self): print('Received {0} messages from channel ""{1}""'.format( self.message_count, self.channel)) subscription_observer = SubscriptionObserver() client.subscribe( channel, SubscriptionMode.RELIABLE, subscription_observer(channel)) # wait for some time client.unsubscribe(channel)
You can perform role-based authentication with the Python SDK. This method uses a role and role secret key from the Dev Portal and authenticates a client session with that role.
The operations that the client can perform depend on the permissions for the role.
The role-based authentication method is a two-step authentication process based on the HMAC process, using the MD5 hashing routine:
Use the provided class satori.rtm.auth.RoleSecretAuthDelegate to create a delegate (that knows the authentication process) and use the delegate with the authenticate(role_auth_delegate, auth_callback) method of the satori.rtm.client.Client or satori.rtm.connection.Connection class. The SDK calls auth_callback on the response from RTM.
For more information, see Authentication and Authorization in the online docs.
Note
Automatic reauthentication can be disable by passing 'restore_auth_on_reconnect=False' to Client constructor or to make_client.
Use the client or connection authenticate method with the authentication delegate and a callback to process the RTM response to the authentication request:
secret_key = '<ROLE_SECRET_KEY>' with sc.make_client( endpoint=endpoint, appkey=platform_appkey) as client: role_auth_delegate = auth.RoleSecretAuthDelegate(\ '<USER_ROLE>', secret_key) auth_ack = threading.Event() def auth_callback(auth_result): if type(auth_result) == auth.Done: print('Auth success') auth_ack.set() else: print('Auth failure: {0}'.format(auth_result)) auth_ack.set() client.authenticate(role_auth_delegate, auth_callback) if not auth_ack.wait(10): raise RuntimeError('No authentication reply in reasonable time')
The Python SDK includes a logging.Logger object in the satori.rtm.logger module. You can configure this logger to your specific needs. You can set the logging verbosity to Debug during debugging to find error sources faster.
To enable stderr-based verbose logging on the command-line, set the DEBUG_SATORI_SDK environment variable to debug:
export DEBUG_SATORI_SDK=debug ./example.py # this now produces verbose logs unset DEBUG_SATORI_SDK ./example.py # this does not