var W3CWebSocket = require('./websocket.js');
var Observer = require('./observer.js');
var Subscription = require('./subscription.js');
var logger = require('./logger.js');
var auth = require('./auth.js');
var objectAssign = require('object-assign');
var CMap = require('./map.js');
var CBOR = require('./cbor.js');
var RTM_VER = 'v2';
var STOPPED = 'stopped';
var CONNECTING = 'connecting';
var CONNECTED = 'connected';
var AWAITING = 'awaiting';
var STATES = {};
/**
* Create a RTM client instance.
* @class
* @augments Observer
*
* @description
* An RTM client is the main entry point for accessing RTM.
*
* To connect a client to RTM, you must call [RTM.start()]{@link RTM#start}. The RTM SDK attempts
* to reconnect to RTM if the connection to RTM fails for any reason.
*
* A client instance can be in one of the following connection states:
* - <strong>stopped:</strong> You called [RTM.stop()]{@link RTM#stop} or RTM disconnected and
* hasn't yet reconnected.
* - <strong>connecting:</strong> You called [RTM.start()]{@link RTM#start} or the client is
* trying to reconnect to RTM.
* - <strong>connected:</strong> The client is connected to RTM.
* - <strong>awaiting:</strong> The client disconnected and is waiting the specified time period
* before reconnecting. See the <code>minReconnectInterval</code> and
* <code>maxReconnectInterval</code> options.
*
* For each state, an event occurs when the client enters or leaves the state. Call
* [RTM.on(name, fn)]{@link RTM#on} method to add code that's executed when the client
* transitions into or out of a state. The syntax for the value of <code>name</code> is
*
* <code><strong>[ enter- | leave- ][ stopped | connecting | connected | awaiting ]</strong></code>
*
* For example, <code>RTM.on("enter-connected", myFunction)</code>. The next example also shows you
* how to call <code>RTM.on()</code>
*
* @example
* // Creates an RTM client
* var rtm = new RTM('YOUR_ENDPOINT', 'YOUR_APPKEY');
*
* // Creates a new subscription to the channel named 'your-channel'
* var subscription = rtm.subscribe('your-channel', RTM.SubscriptionMode.SIMPLE);
*
* // Adds a subscription event listener that logs messages to the console as they arrive.
* // The subscription receives all messages in the subscribed channel
* subscription.on('rtm/subscription/data', function (pdu) {
* pdu.body.messages.forEach(console.log);
* });
*
* // Sets a connection event listener that publishes a message to the channel named
* // <code>your-channel</code>
* // when the client is connected to RTM (the client enters the 'connected' state)
* rtm.on('enter-connected', function () {
* rtm.publish('your-channel', {key: 'value'});
* });
*
* // Sets a client event listener that checks incoming messages to see if they indicate an error.
* // <code>rtm.on()</code> is called for all incoming messages.
* rtm.on('data', function (pdu) {
* if (pdu.action.endsWith('/error')) {
* rtm.restart();
* }
* });
*
* // Starts the client
* rtm.start();
*
* @param {string} endpoint - WebSocket endpoint for RTM
* Available from the Dev Portal.
*
* @param {string} appkey - appkey used to access RTM
* Available from the Dev Portal.
*
* @param {object} opts - additional RTM client parameters
*
* @param {int} [opts.minReconnectInterval=1000] - minimum
* time period, in milliseconds, to wait between reconnection attempts
*
* @param {int} [opts.maxReconnectInterval=120000] - maximum
* time period, in milliseconds, to wait between reconnection attempts
*
* @param {('json'|'cbor')} [opts.protocol='json'] - WebSocket subprotocol to use
* <br>
* The SDK automatically converts messages to the subprotocol you choose. For example, if you
* specify <code>opts.protocol = 'cbor'</code> and then publish a JSON object, the SDK converts it
* to CBOR.
* <br>
* If you don't specify a subprotocol, RTM defaults to JSON subprotocol.
* @param {boolean} [opts.heartbeatEnabled=true] - enables periodic
* heartbeat monitoring for the WebSocket connection
*
* @param {object} [opts.authProvider] - object that manages authentication for the client.
* See {@link auth.js}
*
* @param {int} [opts.heartbeatInterval=60000] - interval,
* in milliseconds, to wait between heartbeat messages
* @param {int} [opts.highWaterMark=4194304] - 4MB. High water mark in bytes. If the number
* of bytes in the WebSocket write buffer exceeds this value,
* [writeable]{@link RTM#writeable} is set to <code>false</code>.
*
* @param {int} [opts.lowWaterMark=2097152] - 2MB. Low water mark, in bytes. If the
* WebSocket write buffer rises above <code>highWaterMark</code> and then drops below
* <code>lowWaterMark</code>, [writeable]{@link RTM#writeable} is set to <code>true</code>.
*
* @param {int} [opts.checkWritabilityInterval=100] - Interval,
* in milliseconds, between checks of the queue length and updates of the
* [writeable]{@link RTM#writeable} property if necessary.
*
* @param {object} [opts.proxyAgent] - proxy server agent.
* A custom http.Agent implementation like:
* https-proxy-agent https://github.com/TooTallNate/node-https-proxy-agent#ws-websocket-connection-example
* socks-proxy-agent https://github.com/TooTallNate/node-socks-proxy-agent#ws-websocket-connection-example
*
* @property {boolean} writable - A general indicator of the status of the write buffer.
* <code>true</code> indicates that the write buffer is shrinking, while <code>false</code>
* indicates that the write buffer is growing. Test <code>writable</code> to see whether you should
* continue to write or pause writing.
*
* @throws {TypeError} <code>TypeError</code> is thrown if mandatory parameters are
* missing or invalid.
*/
function RTM(endpoint, appkey, opts) {
if (typeof endpoint !== 'string') {
throw new TypeError('"endpoint" is missing or invalid');
}
if (typeof appkey !== 'string') {
throw new TypeError('"appkey" is missing or invalid');
}
// superclass constructor call
Observer.call(this);
this.options = objectAssign({
minReconnectInterval: 1000,
maxReconnectInterval: 120000,
heartbeatInterval: 60000,
heartbeatEnabled: true,
protocol: 'json',
highWaterMark: 1024 * 1024 * 4, // 4Mb is the maximum queue length. Writable flag sets to false
lowWaterMark: 1024 * 1024 * 2, // 2Mb unblock the writing. Writable flag sets to true
checkWritabilityInterval: 100,
}, opts);
this.endpoint = this._appendVersion(endpoint) + '?appkey=' + appkey;
this.reconnectCount = 0;
this.lastId = 0;
this.ws = null;
this.reconnectTimer = null;
this.subscriptions = {};
this.ackCallbacks = new CMap();
this.maskMessage = !this._isEndpointSecure(endpoint);
if (this.options.heartbeatEnabled) {
this._initHeartbeatInterval();
}
this.writable = true;
this._initWritableState();
this._initConnectionFSM();
this.on('error', logger.error);
}
RTM.logger = logger;
/**
* @typedef SubscriptionMode
* @type {object}
*
* @property {boolean} trackPosition
* Tracks the stream position received from RTM. RTM includes the <code>position</code>
* parameter in responses to publish and subscribe requests and in subscription data messages.
* The SDK can attempt to resubscribe to the channel data stream from this position.
*
* @property {boolean} fastForward
* If necessary, RTM fast-forwards the subscription when the SDK resubscribes to a channel.
*
* To learn more about position tracking and fast-forwarding, see the sections "... with position"
* and "... with fast-forward (advanced)" in the chapter "Subscribing" in <em>Satori Docs</em>.
*/
/**
* Subscription modes.
*
* @namespace
* @readonly
*/
RTM.SubscriptionMode = {
/**
*
* RTM tracks the <code>position</code> value for the subscription and
* tries to use it when resubscribing after the connection drops and the client reconnects.
* If the <code>position</code> points to an expired message, RTM fast-forwards to the earliest
* <code>position</code> that points to a non-expired message.
*
* This mode reliably goes to the next available message when RTM is resubscribing. However,
* RTM always fast-forwards the subscription if necessary, so it never returns an error for an
* 'out-of-sync' condition.
*
* To learn more about position tracking and fast-forwarding, see the sections "... with position"
* and "... with fast-forward (advanced)" in the chapter "Subscribing" in <em>Satori Docs</em>.
*
* @type {SubscriptionMode}
* @static
* @constant
* @readonly
*/
RELIABLE: {
trackPosition: true,
fastForward: true,
},
/**
*
* RTM doesn't track the <code>position</code> value for the
* subscription. Instead, when RTM resubscribes following a reconnection, it fast-forwards to
* the earliest <code>position</code> that points to a non-expired message.
*
* Because RTM always fast-forwards the subscription, it never returns an error for an
* 'out-of-sync' condition.
*
* To learn more about position tracking and fast-forwarding, see the sections "... with position"
* and "... with fast-forward (advanced)" in the chapter "Subscribing" in <em>Satori Docs</em>.
*
* @type {SubscriptionMode}
* @static
* @constant
* @readonly
*/
SIMPLE: {
trackPosition: false,
fastForward: true,
},
/**
*
* RTM always tracks the <code>position</code> value for the subscription and tries to
* use it when resubscribing after the connection drops and the client reconnects.
*
* If the position points to an expired message, the resubscription attempt fails. RTM sends an
* <code>expired_position</code> error and stops the subscription process.
*
* If the subscription is active, and RTM detects that the current <code>position</code> value
* points to an expired message, the subscription is in an 'out-of-sync' state. In this case,
* RTM sends an <code>out_of_sync</code> error and unsubscribes you.
*
* To learn more about position tracking and fast-forwarding, see the sections "... with position"
* and "... with fast-forward (advanced)" in the chapter "Subscribing" in <em>Satori Docs</em>.
*
* @type {SubscriptionMode}
* @static
* @constant
* @readonly
*/
ADVANCED: {
trackPosition: true,
fastForward: false,
},
};
/**
* Creates a role-based authentication provider for the client
* <p>
* The role-based authentication method is a two-step authentication process based on the HMAC
* process, using the MD5 hashing routine:
* <ul>
* <li>The client obtains a nonce from the server in a handshake request.</li>
* <li>The client then sends an authorization request with its role secret key hashed with the
* received nonce.</li>
* </ul>
* <p>
* To get a role secret key for your application, go to the Dev Portal.
*
* @param {string} role - role name set in the Dev Portal
*
* @param {string} roleSecret - role secret key
*
* @param {object} opts - additional authentication options
*
* @param {int} [opts.timeout=30000] - amount of time, in milliseconds, before the
* authentication operation times out
*
* @throws {TypeError} thrown if mandatory parameters are missing or invalid
*
* @return {function} authentication provider for the role-based authentication method
*/
RTM.roleSecretAuthProvider = function (role, roleSecret, opts) {
return auth.roleSecretAuthProvider(role, roleSecret, opts);
};
RTM.prototype = Object.create(Observer.prototype);
/**
* Starts the client.
*
* The client begins to establish the WebSocket connection
* to RTM and then tracks the state of the connection. If the WebSocket
* connection drops for any reason, the JavaScript SDK attempts to reconnect.
*
*
* Use [RTM.on(name, fn)][RTM.on()]{@link RTM#on} to define application functionality,
* for example, when the application enters or leaves the
* <code>connecting</code> or <code>connected</code> states.
*
* @return {void}
*/
RTM.prototype.start = function () {
if (STOPPED !== this.state) {
throw new Error('RTM is already started');
}
this.fire('start');
};
/**
* Stops the client. The RTM SDK starts to close the WebSocket connection and
* does not start it again unless you call [RTM.start()]{@link RTM#start}.
*
* Use this method to explicitly stop all interaction with RTM.
*
* Use [RTM.on("enter-stopped", function())]{@link RTM#on} or
* [RTM.on("leave-stopped", function())]{@link RTM#on} to
* provide code that executes when the client enters or leaves the <code>stopped</code> state.
*
* @return {void}
*/
RTM.prototype.stop = function () {
if (STOPPED === this.state) {
throw new Error('RTM is already stopped');
}
this.fire('stop');
};
/**
* Calls [RTM.stop()]{@link RTM#stop} followed by [RTM.start()]{@link RTM#start] to
* restart the client. RTM issues events for these client states, which you can handle with code in
* [RTM.on(name, function())]{@link RTM#on}.
*
* @return {void}
*/
RTM.prototype.restart = function () {
this.stop();
this.start();
};
/**
* Returns <code>true</code> if the RTM client is in the <code>stopped</code> state.
*
* @return {boolean} <code>true</code> if the client is in the <code>stopped</code> state,
* otherwise <code>false</code>
*/
RTM.prototype.isStopped = function () {
return this.state === STOPPED;
};
/**
* Returns <code>true</code> if the client is in the <code>connected</code> state.
*
* In this state, the WebSocket connection to RTM is established and any requested authentication
* has completed successfully .
*
* @return {boolean} <code>true</code> if the client is in the <code>connected</code> state,
* otherwise <code>false</code>
*/
RTM.prototype.isConnected = function () {
return this.state === CONNECTED;
};
/**
* Returns the existing [Subscription]{@link Subscription} object for the specified subscription id.
*
* @param {string} subscriptionId - the id for an existing [Subscription]{@link Subscription} object
*
* @throws {TypeError} thrown if <code>subscriptionId</code> is missing, invalid, or if a
* [Subscription]{@link Subscription} object with that id doesn't exist.
*
* @return {Subscription} the [Subscription]{@link Subscription} object
*/
RTM.prototype.getSubscription = function (subscriptionId) {
if (typeof subscriptionId !== 'string') {
throw new TypeError('"subscriptionId" is missing or invalid');
}
return this.subscriptions[subscriptionId];
};
/**
* Creates a subscription to the specified channel.
*
* When you create a subscription, you can specify additional options in the
* <code>bodyOpts</code> parameter. For example, you can specify a streamview or specify what the
* SDK does when it resubscribes after a reconnection.
*
* The callback function you specify for the subscription always receives PDUs in the form of
* JavaScript objects.
*
* @param {string} channelOrSubId - Contains a channel name or a subscription id. If you don't
* specify the <code>filter</code> field in <code>bodyOpts</code>, specify the channel name.
* Otherwise, specify the channel name in the stream SQL in the <code>filter</code> field, and
* specify a subscription id in <code>channelOrSubId</code>.
*
* @param {RTM.SubscriptionMode} mode - Contains flags that determine the resubscribe behavior of
* the RTM SDK and RTM. See [SubscriptionMode]{@link SubscriptionMode}.
*
* @param {object} [bodyOpts={}] - Contains additional options for the subscription
*
* @param {boolean} [bodyOpts.force=false] - Determines how RTM should act if the subscribe request
* contains a <code>subscription_id</code> that already exists. If true, RTM re-subscribes or
* creates a new subscription, depending on the specified subscription parameters. If false, RTM
* returns an error.
*
* @param {boolean} [bodyOpts.fast_forward=false] - Determines how RTM should act if it detects
* that the next message position for the subscription is pointing to an expired message (out of
* sync condition). If true, RTM moves the next message position to the least recent un-expired
* message in the channel. If false, RTM returns an error and terminates the subscription.
*
* @param {int} [bodyOpts.position] - Position of a message in the channel. If you don't
* specify the <code>history</code> field, RTM uses this position as the next message position
* for the subscription. Otherwise, RTM interprets the value in <code>history</code> as an offset
* from the value of <code>position</code>.
*
* @param {object} [bodyOpts.history] - Object that contains history parameters.
*
* @param {int} [bodyOpts.history.count] - Offset from a message position, specified as a
* number of messages. RTM starts the subscription this many messages before the position that the
* subscription otherwise starts with. If you specify <code>bodyOpts.position</code>, RTM uses that
* position as the starting point for the offset. <strong>Note:</strong> If you specify
* <code>count</code>, you can't specify <code>age</code>.
*
* @param {int} [bodyOpts.history.age] - Offset from a message position, specified as a
* duration in seconds. RTM starts the subscription at the least recent message that's this
* many seconds older than the position that the subscription otherwise starts with. If you
* specify <code>bodyOpts.position</code>, RTM uses that position as the starting point for the
* offset. <strong>Note:</strong> If you specify <code>age</code>, you can't specify
* <code>count</code>.
*
* @param {string} [bodyOpts.filter] - Contains a stream SQL statement that selects,
* transforms, or aggregates messages in the specified channel
*
* @param {int} [bodyOpts.period] - Specifies a duration in seconds for each partition
* in an aggregate view. The maximum value is 60 (1 minute).
*
* @throws {TypeError} thrown if mandatory parameters are missing or invalid.
*
* @return {Subscription} - subscription object
*
* @example
* // Creates a new RTM client
* var rtm = new RTM('YOUR_ENDPOINT', 'YOUR_APPKEY');
*
* // Creates a subscription with the name 'your-channel'
* var subscription = rtm.subscribe('your-channel', RTM.SubscriptionMode.SIMPLE);
*
* // Writes incoming messages to the log
* subscription.on('rtm/subscription/data', function (pdu) {
* pdu.body.messages.forEach(console.log);
* });
*
* // Starts the client
* rtm.start();
*
* @example
* // Creates a new RTM client
* var rtm = new RTM('YOUR_ENDPOINT', 'YOUR_APPKEY');
*
* // Subscribes to the channel named 'my-channel' using a streamview
* var subscription = rtm.subscribe('my-filter', RTM.SubscriptionMode.SIMPLE, {
* filter: 'SELECT * FROM my-channel WHERE object.param >= 1 OR object.id == 0',
* });
*
* // Writes incoming messages to the log
* subscription.on('rtm/subscription/data', function (pdu) {
* pdu.body.messages.forEach(console.log);
* });
*
* // Sets a client event listener, for unsolicited subscription PDUs, that reacts to an error PDU
* // by restarting the client connection. The PDU is passed as a parameter.
* rtm.on('data', function (pdu) {
* if (pdu.action.endsWith('/error')) {
* rtm.restart();
* }
*
* rtm.start();
*
* @see {@link RTM.SubscriptionMode.SIMPLE}
* @see {@link RTM.SubscriptionMode.RELIABLE}
* @see {@link RTM.SubscriptionMode.ADVANCED}
*/
RTM.prototype.subscribe = function (channelOrSubId, mode, bodyOpts) {
var containsKeys = function (map, keys) {
return keys.reduce(function (acc, k) {
return acc && {}.hasOwnProperty.call(map, k);
}, true);
};
var subscription;
var pdu;
var opts;
var modeMandatoryKeys = ['fastForward', 'trackPosition'];
if (typeof channelOrSubId !== 'string') {
throw new TypeError('"channelOrSubId" is missing or invalid');
}
if (!(typeof mode === 'object') || !containsKeys(mode, modeMandatoryKeys)) {
throw new TypeError('Subscription mode has incorrect value: ' + mode + '\n' +
'Mode should contains the following mandatory fields: ' + modeMandatoryKeys.join(', ') + '\n' +
'See also: RTM.SubscriptionMode.SIMPLE, RTM.SubscriptionMode.ADVANCED, RTM.SubscriptionMode.RELIABLE');
}
if (this.subscriptions[channelOrSubId]) {
throw new Error('Cannot create subscription ' + subscription + ' twice');
}
opts = objectAssign({}, mode, { bodyOpts: bodyOpts });
subscription = new Subscription(channelOrSubId, opts);
this.subscriptions[channelOrSubId] = subscription;
if (this.isConnected()) {
pdu = subscription.subscribePdu(this._nextId());
this._send(pdu, function (sp) {
subscription.onPdu(sp);
});
}
return subscription;
};
/**
* Updates an existing [Subscription]{@link Subscription} object. Existing
* [Subscription]{@link Subscription} event handlers are copied to the updated object.
*
* Use this method to change an existing subscription. For example, use it to add or change a
* streamview.
*
* @param {string} channelOrSubId - subscription id or channel name for
* the existing subscription
*
* @param {RTM.SubscriptionMode} mode - Contains flags that determine the resubscribe behavior of
* the RTM SDK and RTM. See [SubscriptionMode]{@link SubscriptionMode}.
*
* @param {Object} bodyOpts
* Properties for the updated <code>Subscription</code> object. See
* [RTM.subscribe(channelOrSubId, opts)]{@link #subscribe} for the supported property names.
*
* @param {Function} [onCompleted]
* function to execute on the updated <code>Subscription</code> object
*
* @throws {TypeError} thrown if mandatory parameters are missing or invalid.
*
* @return {void}
*/
RTM.prototype.resubscribe = function (channelOrSubId, mode, bodyOpts, onCompleted) {
var self = this;
var prevSub;
var newSub;
if (typeof channelOrSubId !== 'string') {
throw new TypeError('"channelOrSubId" is missing or invalid');
}
prevSub = self.subscriptions[channelOrSubId];
self.unsubscribe(channelOrSubId, function () {
newSub = self.subscribe(channelOrSubId, mode, bodyOpts);
newSub.handlers = prevSub.handlers;
if (onCompleted) {
onCompleted(newSub);
}
});
};
/**
* Removes the specified subscription.
*
* @param {string} subscriptionId - Subscription id or channel name.
*
* @param {Function} [onAck]
* Callback function that's invoked when RTM responds to the unsubscribe request. RTM passes the
* response PDU to this function. If you don't specify <code>onAck</code>, RTM doesn't send a
* response PDU.
*
* @throws {TypeError} thrown if required parameters are missing or invalid
*
* @return {void}
*/
RTM.prototype.unsubscribe = function (subscriptionId, onAck) {
var self = this;
var sub;
var onUnsubscribed;
var pdu;
if (typeof subscriptionId !== 'string') {
throw new TypeError('"subscriptionId" is missing or invalid');
}
sub = self.subscriptions[subscriptionId];
if (!sub) {
throw new Error('Unknown subscription ' + subscriptionId);
}
// This method is called when rtm/unsubscribe/(ok|error) is returned.
// If client is disconnected this method is called immediately without argument.
onUnsubscribed = function (unsubscribeReplyPdu) {
if (unsubscribeReplyPdu) {
sub.onPdu(unsubscribeReplyPdu);
}
delete self.subscriptions[subscriptionId];
if (onAck) {
onAck(unsubscribeReplyPdu);
}
};
if (sub.isSubscribed) {
pdu = sub.unsubscribePdu(self._nextId());
self._send(pdu, onUnsubscribed);
} else {
onUnsubscribed();
}
};
/**
* Publishes a message to a channel. The client must be connected.
*
* @example
* // Publishes to the channel named "channel", and provides a callback function that's invoked when
* // RTM responds to the request. If the PDU "action" value doesn't end with "ok", the function
* // logs an error.
* rtm.publish('channel', {key: 'value'}, function (pdu) {
* if (!pdu.action.endsWith('/ok')) {
* console.log('something went wrong');
* }
* });
*
* @param {string} channel - channel name
*
* @param {JSON | Uint8Array} message
* <br>
* JSON object or binary data containing the message to publish
* <br>
* The type you choose is independent of the subprotocol you're using for your client. The SDK
* automatically converts the message to the correct format before sending it to RTM.
*
* @param {Function} [onAck]
* Callback function that's invoked when RTM responds to the publish request. RTM passes the
* response PDU to this function. If you don't specify <code>onAck</code>, RTM doesn't send a
* response PDU.
*
* @throws {TypeError} thrown if required parameters are missing or invalid
*
* @return {void}
*/
RTM.prototype.publish = function (channel, message, onAck) {
var command;
if (typeof channel !== 'string') {
throw new TypeError('"channel" is missing or invalid');
}
if (typeof message === 'undefined') {
throw new TypeError('"message" is missing');
}
command = {
action: 'rtm/publish',
body: {
channel: channel,
message: message,
},
};
return this._send(command, onAck);
};
/**
* Reads the latest message written to a specific channel, as a Protocol
* Data Unit (<strong>PDU</strong>). The client must be connected.
*
* The callback function you specify receives a PDU in the same format as the
* subprotocol you specify in the client constructor {@link RTM}. RTM automatically converts
* messages.
*
* @variation 1
*
* @param {string} channel - name of the channel to read from
*
* @param {Function} [onAckOrOpts]
* Callback function that's invoked when RTM responds to the read request. RTM passes the
* response PDU to this function. If you don't specify <code>onAck</code>, RTM doesn't send a
* response PDU.
*
* @example
* // Reads from the channel named 'channel' and prints the response PDU
* rtm.read('channel', function (pdu) {
* console.log(pdu);
* })
*
* @throws {TypeError} thrown if required parameters are missing or invalid
*
* @return {void}
*
* @also
*
* Reads the latest message written to specific channel, as a Protocol
* Data Unit (<strong>PDU</strong>). The client must be connected.
*
* @variation 2
*
* @param {string} channel - name of the channel to read from
*
* @param {object} [opts={}]
* Additional options in the read PDU that's sent to RTM in the request.
* For more information, see the section "Read PDU" in the "RTM API" chapter of <em>Satori Docs/em>.
*
* @param {object} [opts.bodyOpts={}]
* Additional options in the <code>body</code> element of the read PDU that's sent to
* RTM in the request.
*
* @param {Function} [opts.onAck]
* Callback function that's invoked when RTM responds to the read request. RTM passes the
* response PDU to this function. If you don't specify <code>onAck</code>, RTM doesn't send a
* response PDU.
*
* @example
* // Reads from the channel named 'channel', starting at the position specified by the
* // "position" key.
* // Prints the response PDU.
* rtm.read('channel', {
* bodyOpts: { position: '1485444476:0' },
* onAck: function (pdu) {
* console.log(pdu);
* }
* })
*
* @throws {TypeError} thrown if required parameters are missing or invalid
*
* @return {void}
*/
RTM.prototype.read = function (channel, onAckOrOpts) {
var command;
var opts;
if (typeof channel !== 'string') {
throw new TypeError('"channel" is missing or invalid');
}
if (typeof onAckOrOpts === 'function') {
opts = { onAck: onAckOrOpts };
} else {
opts = onAckOrOpts;
}
command = {
action: 'rtm/read',
body: objectAssign({}, opts.bodyOpts, { channel: channel }),
};
return this._send(command, opts.onAck);
};
/**
* Writes a value to the specified channel. The client must be connected.
*
* @param {string} channel - name of the channel to write to
*
* @param {JSON | Uint8Array} value
* <br>
* JSON object or binary data containing the message to write
* <br>
* The type you choose is independent of the subprotocol you're using for your client. The SDK
* automatically converts the message to the correct format before sending it to RTM.
*
* @param {Function} [onAck]
* Callback function that's invoked when RTM responds to the publish request. RTM passes the
* response PDU to this function. If you don't specify <code>onAck</code>, RTM doesn't send a
* response PDU.
*
* @example
* // Writes the string 'value' to the channel named 'channel' and prints the response PDU.
* rtm.write('channel', 'value', function (pdu) {
* console.log(pdu);
* })
*
* @throws {TypeError} thrown if required parameters are missing or invalid
*
* @return {void}
*/
RTM.prototype.write = function (channel, value, onAck) {
var command;
if ((typeof channel !== 'string')) {
throw new TypeError('"channel" is missing or invalid');
}
if (typeof value === 'undefined') {
throw new TypeError('"value" is missing or invalid');
}
command = {
action: 'rtm/write',
body: { channel: channel, message: value },
};
return this._send(command, onAck);
};
/**
* Deletes the value for the associated channel. The [RTM]{@link RTM} client must be connected.
*
* @param {string} channel - Channel name.
*
* @param {Function} [onAck]
* Callback function that's invoked when RTM responds to the publish request. RTM passes the
* response PDU to this function. If you don't specify <code>onAck</code>, RTM doesn't send a
* response PDU.
*
* @example
* rtm.delete('channel', function (pdu) {
* console.log(pdu);
* })
*
* @throws {TypeError} thrown if required parameters are missing or invalid
*
* @return {void}
*/
RTM.prototype.delete = function (channel, onAck) {
var command;
if (typeof channel !== 'string') {
throw new TypeError('"channel" is missing or invalid');
}
command = {
action: 'rtm/delete',
body: { channel: channel },
};
return this._send(command, onAck);
};
// Private methods
RTM.prototype._initHeartbeatInterval = function () {
var self = this;
var heartbeatTimer;
var interval = this.options.heartbeatInterval;
this.on('open', function () {
var pingTimestamp = 0;
var pongTimestamp = 0;
heartbeatTimer = setInterval(function () {
if (pongTimestamp < pingTimestamp) {
self._disconnect();
return;
}
pingTimestamp = Date.now();
self.publish('$heartbeat', '', function () {
pongTimestamp = Date.now();
});
}, interval);
});
this.on('close', function () {
clearInterval(heartbeatTimer);
});
};
RTM.prototype._initWritableState = function () {
var self = this;
var writableStateTimer;
var interval = this.options.checkWritabilityInterval;
this.on('open', function () {
self._setWritableState(true);
writableStateTimer = setInterval(function () {
self._checkWritableState();
}, interval);
});
this.on('close', function () {
clearInterval(writableStateTimer);
self._setWritableState(false);
});
};
RTM.prototype._checkWritableState = function () {
if (this.writable && this.ws.bufferedAmount > this.options.highWaterMark) {
this._setWritableState(false);
} else if (!this.writable && this.ws.bufferedAmount < this.options.lowWaterMark) {
this._setWritableState(true);
}
};
RTM.prototype._setWritableState = function (newState) {
if (this.writable !== newState) {
this.writable = newState;
this.fire('change-writability', this.writable);
}
};
RTM.prototype._connect = function () {
var self = this;
var ws;
logger.debug('Connecting to', this.endpoint, '(' + this.options.protocol + ')');
if ('proxyAgent' in this.options) {
logger.debug(' (using proxy agent)');
}
ws = this.ws = new W3CWebSocket(this.endpoint, this.options.protocol, {
perMessageDeflate: false,
agent: this.options.proxyAgent,
});
ws.binaryType = 'arraybuffer';
ws.onopen = function () {
self.fire('open');
};
ws.onmessage = function (message) {
var obj;
try {
obj = self._decode(message.data);
self._recv(obj);
self.fire('data', obj);
} catch (error) {
self.fire('error', error);
}
};
ws.onerror = function (error) {
self.fire('error', error);
};
ws.onclose = function () {
if (self.ws === ws) {
self.ws = null;
self.fire('close');
}
};
};
RTM.prototype._encode = function (obj) {
var data;
var debugVar;
// check the subprotocol selected by server
if (this.ws.protocol === 'cbor') {
data = CBOR.encode(obj);
debugVar = obj;
} else {
data = JSON.stringify(obj);
debugVar = data;
}
logger.debug('>>>', debugVar);
return data;
};
RTM.prototype._decode = function (data) {
var obj;
var debugVar;
// check the subprotocol selected by server
if (this.ws.protocol === 'cbor') {
if (!(data instanceof ArrayBuffer)) {
throw new TypeError('CBOR protocol expects ArrayBuffer as incoming data from WebSocket');
}
obj = CBOR.decode(data);
debugVar = obj;
} else {
if (!(typeof data === 'string')) {
throw new TypeError('JSON protocol expects string as incoming data from WebSocket');
}
obj = JSON.parse(data);
debugVar = data;
}
logger.debug('<<<', debugVar);
return obj;
};
RTM.prototype._send = function (pdu, onAck) {
if (!this.isConnected()) {
throw new Error('Client is not connected');
}
return this._unsafeSend(pdu, onAck);
};
RTM.prototype._unsafeSend = function (origPdu, onAck) {
var pdu = objectAssign({}, origPdu);
var data;
if (onAck) {
pdu.id = 'id' in pdu ? pdu.id : this._nextId();
this.ackCallbacks.set(pdu.id, onAck);
}
data = this._encode(pdu);
try {
this.ws.send(data, {
mask: this.maskMessage,
});
} catch (error) {
this.fire('error', error);
}
this._checkWritableState();
};
RTM.prototype._recv = function (pdu) {
var subscriptionId;
var subscription;
var ack;
// standalone versions because `startsWith` && `endsWith` appeared in ES6 only
var startsWith = function (str, prefix) {
return str.substr(0, prefix.length) === prefix;
};
var endsWith = function (str, suffix) {
return str.indexOf(suffix, str.length - suffix.length) !== -1;
};
if (pdu.body && ('subscription_id' in pdu.body) && startsWith(pdu.action, 'rtm/subscription/')) {
subscriptionId = pdu.body.subscription_id;
subscription = this.subscriptions[subscriptionId];
if (subscription) {
subscription.onPdu(pdu);
}
}
if ('id' in pdu) {
ack = this.ackCallbacks.get(pdu.id);
if (ack) {
if (!endsWith(pdu.action, '/data')) {
this.ackCallbacks.delete(pdu.id);
}
ack(pdu);
}
}
};
RTM.prototype._disconnect = function () {
var ws = this.ws;
if (ws) {
ws.onclose();
ws.onclose = null;
ws.close();
}
this.ackCallbacks.clear();
};
RTM.prototype._nextId = function () {
this.lastId += 1;
return this.lastId;
};
RTM.prototype._nextReconnectInterval = function () {
var THRESHOLD_FAIL_COUNT = 30;
var minReconnectInterval = this.options.minReconnectInterval;
var maxReconnectInterval = this.options.maxReconnectInterval;
var jitter = Math.random() * this.options.minReconnectInterval;
var count = Math.min(this.reconnectCount, THRESHOLD_FAIL_COUNT);
var interval = Math.min(maxReconnectInterval,
jitter + (minReconnectInterval * Math.pow(2, count)));
return interval;
};
RTM.prototype._forEachSubscription = function (fn) {
var self = this;
Object.keys(self.subscriptions).forEach(function (subscriptionId) {
if ({}.hasOwnProperty.call(self.subscriptions, subscriptionId)) {
fn(subscriptionId, self.subscriptions[subscriptionId]);
}
});
};
RTM.prototype._subscribeAll = function () {
var self = this;
this._forEachSubscription(function (subscriptionId, subscription) {
var pdu = subscription.subscribePdu(self._nextId());
self._send(pdu, function (sp) {
subscription.onPdu(sp);
});
});
};
RTM.prototype._disconnectAll = function () {
this._forEachSubscription(function (subscriptionId, subscription) {
subscription.onDisconnect();
});
};
RTM.prototype._isEndpointSecure = function (endpoint) {
if (endpoint.indexOf('wss://') === 0) {
return true;
}
return false;
};
// Connection Finite State Machine
//
STATES[STOPPED] = {
_enter: function () {
this._disconnect();
},
_leave: function () {},
start: function () {
this._transition(CONNECTING);
},
close: function () {},
};
STATES[CONNECTING] = {
_enter: function () {
try {
this._connect();
} catch (e) {
this.fire('error', e);
}
},
_leave: function () {},
open: function () {
var self = this;
var onsuccess;
var onfail;
this.lastId = 0;
if (this.options.authProvider) {
onsuccess = function () {
self.fire('authenticated');
self._transition(CONNECTED);
};
onfail = function (e) {
self.fire('error', e);
};
this.options.authProvider(this, onsuccess, onfail);
} else {
this._transition(CONNECTED);
}
},
error: function () {
this._transition(AWAITING);
},
close: function () {
this._transition(AWAITING);
},
stop: function () {
this._transition(STOPPED);
},
};
STATES[CONNECTED] = {
_enter: function () {
this.reconnectCount = 0;
this._subscribeAll();
},
_leave: function () {
this._disconnectAll();
},
close: function () {
this._transition(AWAITING);
},
stop: function () {
this._transition(STOPPED);
},
};
STATES[AWAITING] = {
_enter: function () {
var self = this;
var interval;
this._disconnect();
interval = this._nextReconnectInterval();
this.reconnectTimer = setTimeout(function () {
self.reconnectCount += 1;
self._transition(CONNECTING);
}, interval);
},
_leave: function () {
if (this.reconnectTimer !== null) {
clearTimeout(this.reconnectTimer);
}
this.reconnectTimer = null;
},
stop: function () {
this._transition(STOPPED);
},
close: function () { },
};
RTM.prototype._initConnectionFSM = function () {
var self = this;
var events = [
'open',
'close',
'error',
'start',
'stop',
'reconnect',
];
this._transition(STOPPED);
events.forEach(function (ev) {
self.on(ev, function () {
self._safelyCallFSMEvent(ev);
});
});
};
RTM.prototype._safelyCallFSMEvent = function (ev) {
var fn = STATES[this.state][ev];
logger.debug('FSM event:', ev, 'Current state:', this.state);
if (fn) {
try {
fn.call(this);
} catch (error) {
logger.error(error, 'Unexpected error during event callback call', this.state, ev);
}
} else {
logger.warn('Nothing to do for event', ev, this.state);
}
};
RTM.prototype._transition = function (newState) {
logger.debug('Transition from', this.state, 'to', newState);
if (this.state) {
this._safelyCallFSMEvent('_leave');
this.fire('leave-' + this.state);
}
this.state = newState;
this._safelyCallFSMEvent('_enter');
this.fire('enter-' + this.state);
};
RTM.prototype._appendVersion = function (ep) {
var versionMatch = ep.match(/\/(v\d+)$/);
var ret = ep;
var ver;
if (versionMatch !== null) {
ver = versionMatch[1];
logger.warn(
'satori-rtm-sdk: specifying RTM endpoint with protocol version is deprecated.\n' +
'satori-rtm-sdk: please remove version \'' + ver + '\' from endpoint:\'' + ep + '\''
);
return ret;
}
if (ret[ret.length - 1] !== '/') {
ret += '/';
}
return ret + RTM_VER;
};
module.exports = RTM;