RTM client.
The RtmClient class is the main entry point to manage the WebSocket connection from the PHP SDK to RTM.
Use the RtmClient class to create a client instance from which you can publish messages and subscribe to channels, create separate Subscription objects for each channel to which you want to subscribe.
RtmClient has a single-threaded model. This model imposes some limitations:
RTM Client allows to use Event-Based model for Events. Use client.on<Event> function to continuously processing events.
Base syntax: $client->onEvent($callback_function);
Example:
<?php
$client = new RtmClient(ENDPOINT, APP_KEY);
$client->onConnected(function () {
echo 'Connected to Satori RTM and authenticated as ' . ROLE . PHP_EOL;
})->onError(function ($type, $error) {
echo "Type: $type; Error: $error[message] ($error[code])" . PHP_EOL;
});
$client->connect();
Each event handler returns $client object, so you can register callbacks continuously.
You can register multiple callbacks on the same event:
$client = new RtmClient(ENDPOINT, APP_KEY);
$client->onConnected(function () {
echo 'Callback 1';
});
$client->onConnected(function () {
echo 'Callback 2';
});
$client->connect();
There are 4 base events:
onConnected()
onDisconnected($code, $message)
onAuthenticated()
onError($type, $error)
You can specify role to get role-based permissions (E.g. get an access to Subscribe/Publish to some channels) when connecting to the endpoint. Follow the link to get more information: https://www.satori.com/docs/using-satori/authentication
Use \RtmClient\Auth\RoleAuth to authenticate using role-based authentication:
$options = array(
'auth' => new RtmClient\Auth\RoleAuth(ROLE, ROLE_SECRET_KEY),
);
$client = new RtmClient(ENDPOINT, APP_KEY, $options);
RTM client allows to subscribe to channels:
$client->subscribe('animals', function ($ctx, $type, $data) {
print_r($data);
});
Check the \RtmClient\Subscription\Subscription class to get more information about the possible options.
A subscription callback is called when the following subscription events occur:
SUBSCRIBED - after getting confirmation from Satori RTM about subscription
UNSUBSCRIBED - after successful unsubscribing
DATA - when getting rtm/subscription/data from Satori RTM
INFO - when getting rtm/subscription/info message
ERROR - on every rtm/subscription/error or rtm/subscribe/error
You should specify callback when creating a new subscription. Example:
use RtmClient\Subscription\Events;
$callback = function ($ctx, $type, $data) {
switch ($type) {
case Events::SUBSCRIBED:
echo 'Subscribed to: ' . $ctx['subscription']->getSubscriptionId() . PHP_EOL;
break;
case Events::UNSUBSCRIBED:
echo 'Unsubscribed from: ' . $ctx['subscription']->getSubscriptionId() . PHP_EOL;
break;
case Events::DATA:
foreach ($data['messages'] as $message) {
if (isset($message['who']) && isset($message['where'])) {
echo 'Got animal ' . $message['who'] . ': ' . json_encode($message['where']) . PHP_EOL;
} else {
echo 'Got message: ' . json_encode($message) . PHP_EOL;
}
}
break;
case Events::ERROR:
echo 'Subscription failed. ' . $err['error'] . ': ' . $err['reason'] . PHP_EOL;
break;
}
};
$subscription = $client->subscribe('animals', $callback, array(
'filter' => "SELECT * FROM `animals` WHERE who = 'zebra'",
));
Because of RtmClient has a single-threaded model you should alternate read and write operations
Simple publish with ack example. We publish message and require acknowledge from Sator RTM:
$client = new RtmClient(ENDPOINT, APP_KEY);
$client->publish(CHANNEL, 'test message', function ($ack) {
echo 'Got ack from Satori RTM';
});
$client->tcpReadSync(); // Wait for reply from Satori RTM
In case if you do not want to wait too much time on reading use timeout:
$client = new RtmClient(ENDPOINT, APP_KEY);
$client->publish(CHANNEL, 'test message', function ($ack) {
echo 'Got ack from Satori RTM';
});
$client->tcpReadSync(2); // Wait for incoming message for 2 seconds only
If you await multiple replies use \RtmClient\RtmClient::waitAllReplies()
$client = new RtmClient(ENDPOINT, APP_KEY);
$client->publish(CHANNEL, 'message', function ($ack) {
echo 'Got ack 1 from Satori RTM';
});
$client->publish(CHANNEL, 'message-2', function ($ack) {
echo 'Got ack 2 from Satori RTM';
});
$client->read(CHANNEL, function ($data) {
echo 'Got read data from Satori RTM';
});
$client->waitAllReplies(); // Also you can specify wait timeout in seconds
echo 'Done!';
// Output:
// Got ack 1 from Satori RTM
// Got ack 2 from Satori RTM
// Got read data from Satori RTM
// Done!
Also there is an Async mode. Reading in this mode means, that you will not be blocked if there are no incoming messages in socket:
use RtmClient\WebSocket\ReturnCode as RC;
$client = new RtmClient(ENDPOINT, APP_KEY);
$client->publish(CHANNEL, 'test message', function ($ack) {
echo 'Got ack from Satori RTM';
});
$code = $client->tcpReadAsync();
switch ($code) {
case RC::READ_OK:
echo 'Read incoming message';
break;
case RC::READ_WOULD_BLOCK:
echo 'There are no messages in socket at this moment';
break;
default:
echo 'Another return code';
}
If you subscribe to the channels and want to publish messages in the same time you can use tcpReadAsync or Async helpers: \RtmClient\RtmClient::sockReadIncoming() or \RtmClient\RtmClient::sockReadFor()
$messages_count = 0;
$client = new RtmClient(ENDPOINT, APP_KEY);
$client->subscribe(CHANNEL, function ($ctx, $type, $data) use (&$messages_count) {
if ($type == Events::DATA) {
foreach ($data['messages'] as $message) {
echo 'Got message: ' . json_encode($message) . PHP_EOL;
$messages_count++;
}
}
});
while (true) {
$client->sockReadFor(2); // Read possible messages for 2 seconds
$client->publish(ANOTHER_CHANNEL, time(), function() {
echo 'Sent time' . PHP_EOL;
});
$client->publish(MY_STAT_CHANNEL, $messages_count, function() {
echo 'Sent messages count' . PHP_EOL;
});
}
An RtmClient instance is a one-time connection. It means that you cannot continue using client after connection is dropped.
To make a new connection to Satori RTM you can clone previous client:
$new_client = clone $old_client;
$new_client->connect();
All your callbacks will be moved to the new client. After calling connect
client will establish a new connection to Satori RTM.
Note that you need to restore your subscriptions manually.
See reconnects examples.
example |
Authentication example |
---|---|
example |
Change filter of existing subscription. |
example |
Publish example |
example |
Continunously publish with processing disconnects. |
example |
Continunously publish and restore subscription on disconnects. |
example |
Subscription example |
example |
Event handlers example |
package |
Default |
__clone()
Uses previously added client callbacks and subscriptions.
__construct(string $endpoint, string $appkey, array $options = array())
Throws |
|
---|
string
Endpoint for RTM. Available from the Dev Portal
string
Appkey used to access RTM. Available from the Dev Portal
array
Additional parameters for the RTM client instance
$options = [
'auth' => (Auth\iAuth) Any instance that implements iAuth instance
'logger' => (\Psr\Log\LoggerInterface Custom logger
]
appendVersion(string $endpoint) : string
Before: wss://some.endpoint.com
After: wss://some.endpoint.com/v2
string
Custom endpoint.
string
Endpoint with added RTM_VER.
buildConnectionUrl(string $endpoint, string $appkey, string $hash = '') : string
string
Server URL with schema
string
Application key
string
URL hash. Uses for persistent connections
string
checkConnected(string $action) : true
Fires ERROR with {@linksee RtmClient::ERROR_TYPE_APPLICATION} if not connected.
string
Which action checks the connection
true
if connected, false otherwise
close(integer $status = 1000, string $reason = 'Connection closed') : void
integer
Close status code.
string
Any message that will be send in close frame.
connect() : boolean
Upgrades connection to WebSocket See throubleshoots section in the README.md file if you failed to connect to an endpoint
Throws |
|
---|
boolean
true if connection has been established, false otherwise
delete(string $channel, callable $callback = null, array $extra = array()) : true
The RtmClient client must be connected.
Throws |
|
---|
string
Channel name
callable
Function to attach and execute on the response PDU from RTM. The response PDU body is passed as a parameter to this function. RTM does not send a response PDU if a callback is not specified.
array
Additional request options. These extra values are sent to RTM in the body element of the PDU that represents the request.
true
if delete PDU has been sent to RTM, false otherwise
disconnectAllSubscriptions() : void
fire()
Executes callback functions and passes data to them.
getSubscription(string $subscription_id) : \RtmClient\Subscription\Subscription|null
string
Subscription id or channel name
\RtmClient\Subscription\Subscription|null
null if subscription not found
getSubscriptions() : array<mixed,\RtmClient\Subscription\Subscription>
isConnected() : boolean
boolean
true if connected, false otherwise
off(string $event, callable $callback)
Use the callback function that you used when calling "on".
string
Event name
callable
Callback function that was used when calling "on"
on(string $event, callable $callback)
string
Event name
callable
function to be called when an event is "fire"
onAuthenticated(callable $callback) : $this
callback function params:
callable
$this
onConnected(callable $callback) : $this
Called when connection has been established and authentication (optional) is completed. If connection type is persistent the event will be called after getting previously established connection.
callback function params:
callable
$this
onDisconnected(callable $callback) : $this
callback function params ($code, $message):
callable
$this
onError(callable $callback) : $this
callback function params ($type, $error):
callable
$this
persistentConnection(string $endpoint, string $appkey, array $options = array())
Endpoint, appkey and optional connection_id is a key to check if instance has been previously created.
Singleton.
Throws |
|
---|
string
Endpoint for RTM. Available from the Dev Portal
string
Appkey used to access RTM. Available from the Dev Portal
array
Additional parameters for the RTM client instance
$options = [
'auth' => (Auth\iAuth) Any instance that implements iAuth instance
'logger' => (\Psr\Log\LoggerInterface Custom logger
'connection_id' => string Provides ability to create different connections to the same endpoint
]
Usage:
$client = RtmClient::persistentConnection('wss://endpoint.satori.com', 'appkey1234', array(
'connection_id' => 'connection1', // optional
));
processCallback(callable $callback) : callable
Converts PduRC codes to \RtmClient::CODE_ERROR and \RtmClient::CODE_OK
callable
User callback function
callable
callback wrapper
processException(\Exception $e) : void
\Exception
Instance of Exception
processSubscriptionRequests(\RtmClient\Pdu\Pdu $pdu) : true
\RtmClient\Pdu\Pdu
true
if we matched subscription_id in PDU with Subscription in internal subscriptions, false otherwise
publish(string $channel, mixed $message, callable $callback = null, array $extra = array()) : true
The RtmClient client must be connected.
Example:
$animal = array(
'who' => 'zebra',
'where' => [34.134358, -118.321506],
);
$client->publish('animals', $animal, function ($code, $response) {
if ($code == RtmClient::CODE_OK) {
echo 'Publish confirmed!' . PHP_EOL;
} else {
echo 'Failed to publish. Error: ' . $response['error'] . '; Reason: ' . $response['reason'] . PHP_EOL;
}
});
Throws |
|
---|
string
Channel name
mixed
Any type that can be serialized via json_encode
callable
Function to attach and execute on the response PDU from RTM. The response PDU body is passed as a parameter to this function. RTM does not send a response PDU if a callback is not specified.
array
Additional request options. These extra values are sent to RTM in the body element of the PDU that represents the request.
true
if message has been sent, false otherwise
read(string $channel, callable $callback, array $extra = array()) : true
The RtmClient client must be connected.
Example:
$client->read('animals', function ($code, $body) {
if ($code == RtmClient::CODE_OK) {
echo $body['message'];
}
});
Throws |
|
---|
string
Channel name
callable
Function to attach and execute on the response PDU from RTM. The response PDU body is passed as a parameter to this function. RTM does not send a response PDU if a callback is not specified.
array
Additional request options. These extra values are sent to RTM in the body element of the PDU that represents the request.
true
if read PDU has been sent to RTM, false otherwise
sendWebSocketPing(string $text = 'ping') : true
Throws |
|
---|
string
Text to be send as ping payload
true
if sent
socketSend(string $action, array $body, callable $callback = null) : true
Throws |
|
---|
string
PDU action
array
PDU body
callable
user callback on getting response from Satori RTM
true
if successfully sent the PDU, false otherwise
sockRead(\RtmClient\WebSocket\Client::ASYNC_READ|\RtmClient\WebSocket\Client::SYNC_READ $mode, integer $timeout_sec, integer $timeout_microsec) : \RtmClient\WebSocket\ReturnCode|false
Throws |
|
---|
integer
The seconds part of the timeout to be set
integer
The microseconds part of the timeout to be set
\RtmClient\WebSocket\ReturnCode|false
false if not connected
sockReadAsync() : \RtmClient\WebSocket\ReturnCode|false
BE AWARE: Async mode is using only to determine if the incoming buffer contains any information. It means that you WILL NOT BE blocked if no data is in it.
BUT SDK still uses Sync mode to read a whole WebSocket frame. It means that you WILL BE blocked if incoming buffer has only part of the WebSocket frame.
TODO |
: SDK will support full Async mode in next versions |
---|---|
Throws |
|
\RtmClient\WebSocket\ReturnCode|false
false if not connected
sockReadFor(integer $seconds, integer $microseconds) : void
Throws |
|
---|
integer
The seconds part of the maximal reading time to be set
integer
The microseconds part of the maximal reading time to be set
sockReadIncoming() : void
It means reading message until Error or until no more messages in the buffer.
Throws |
|
---|
sockReadSync(integer $timeout_sec, integer $timeout_microsec) : \RtmClient\WebSocket\ReturnCode|false
Application will be blocked until the message arrives.
Throws |
|
---|
integer
The seconds part of the timeout to be set
integer
The microseconds part of the timeout to be set
\RtmClient\WebSocket\ReturnCode|false
false if not connected
subscribe(string $subscription_id, callable $callback, array $options = array()) : void
When you create a channel subscription, you can specify additional properties, for example, add a filter to the subscription and specify the behavior of the SDK when resubscribing after a reconnection.
For more information about the options for a channel subscription, see Subscribe PDU in the online docs.
Simple example:
$client = new RtmClient(ENDPOINT, APP_KEY, array(
'auth' => new RoleAuth(ROLE, ROLE_SECRET_KEY),
));
$client->connect() or die;
$subscription = $client->subscribe('animals');
$subscription->onData(function ($data) {
foreach ($data['messages'] as $message) {
echo 'Got message: ' . json_encode($message) . PHP_EOL;
}
});
Subscribe with filter/view (Stream SQL):
$client = new RtmClient(ENDPOINT, APP_KEY, array(
'auth' => new RoleAuth(ROLE, ROLE_SECRET_KEY),
));
$client->connect() or die;
$subscription = $client->subscribe('animals', array(
'filter' => "SELECT * FROM `animals` WHERE who = 'zebra'",
))->onData(function ($data) {
foreach ($data['messages'] as $message) {
echo 'Got message: ' . json_encode($message) . PHP_EOL;
}
});
example |
Subscribe to channel |
---|---|
Throws |
|
string
String that identifies the channel. If you do not use the filter parameter, it is the channel name. Otherwise, it is a unique identifier for the channel (subscription id).
callable
Custom callback. Such callback will be called on any subscription events, described in {@see RtmClient\Subscription\Events} Callback function will get 3 arguments: $ctx - Context. Current subscription instance $type - Event type: {@see RtmClient\Subscription\Events} $data - Type-related data. Check Protocol Data Unit (PDU) to get information about data content
array
Additional subscription options for a channel subscription. These options are sent to RTM in the body element of the Protocol Data Unit (PDU) that represents the subscribe request. For more information about the body element of a PDU, see RTM API in the online docs.
unsubscribe(string $subscription_id) : \RtmClient\self::ERROR_CODE_UNKNOWN_SUBSCRIPTION
Throws |
|
---|
string
Subscription id or channel name.
\RtmClient\self::ERROR_CODE_UNKNOWN_SUBSCRIPTION
if no subscription found. true if Unsubscribe PDU has been sent, false otherwise.
waitAllReplies(integer $timeout_sec, integer $timeout_microsec) : void
Throws |
|
---|
integer
The seconds part of the maximal awaiting time to be set
integer
The microseconds part of the maximal awaiting time to be set
write(string $channel, mixed $message, callable $callback = null, array $extra = array()) : true
The RtmClient client must be connected.
Throws |
|
---|
string
Channel name
mixed
Any type that can be serialized via json_encode
callable
Function to attach and execute on the response PDU from RTM. The response PDU body is passed as a parameter to this function. RTM does not send a response PDU if a callback is not specified.
array
Additional request options. These extra values are sent to RTM in the body element of the PDU that represents the request.
true
if message has been sent, false otherwise
RTM_VER
CODE_OK
CODE_ERROR
ERROR_TYPE_APPLICATION
ERROR_TYPE_CONNECTION
ERROR_TYPE_AUTHENTICATION
ERROR_CODE_EMPTY_ENDPOINT
ERROR_CODE_EMPTY_APPKEY
ERROR_CODE_NOT_AUTH_INTERFACE
ERROR_CODE_UNKNOWN_SUBSCRIPTION
ERROR_CODE_NOT_CONNECTED
ERROR_CODE_CLIENT_IN_USE
ERROR_CODE_PERSISTENT_SUBSCRIBE
is_connected : boolean
var |
---|
boolean
subscriptions : array
var |
---|
array
logger : \RtmClient\Psr\Log\LoggerInterface
var |
---|
\RtmClient\Psr\Log\LoggerInterface
once_connected : boolean
var |
---|
boolean
events : array
var |
---|
array
stub : array
Requires to avoid "Missing numbers of arguments" if callback function requires args, that were not passed to fire()
var |
---|
array