subscription.js

var Observer = require('./observer.js');
var objectAssign = require('object-assign');

/**
 * Creates an instance of a subscription. This function inherits functions from
 * [Observer.js]{@link Observer}, such as [on(event, fn)]{@link Observer#on}
 * @class
 * @augments Observer
 *
 * @description
 * <code>Subscription</code> represents a subscription to a channel. Its functions manage the
 * subscription state and respond to subscription events.
 *
 * Use <code>Subscription</code> functions to specify code that executes when an event occurs or
 * when the subscription enters a specific state.
 *
 * For example, use <code>Subscription.on("rtm/subscription/data", fn())</code> to specify a
 * function that's executed when the subscription receives a message. Use
 * <code>Subscription.on("enter-subscribed", fn())</code> to specify a function that's executed
 * when the subscription is active.
 *
 * When your application receives a channel message, the <code>data</code> event occurs and the
 * message is passed as a Protocol Data Unit (<strong>PDU</strong>) to the function specified for
 * <code>Subscription.on("rtm/subscription/data", fn())</code>.
 *
 * The format of the PDU in messages you receive is the same as the
 * subprotocol you specify in the client constructor {@link RTM}. RTM automatically converts
 * messages before it sends them.
 *
 * You can also specify an event handler function that executes when the subscription enters or
 * leaves subscribed state. For example, to specify an event handler for the
 * <code>enter-subscribed</code> event, use <code>Subscription.on("enter-subscribed", fn()}</code>.
 *
 * <strong>Note:</strong> When the connection from the client to RTM drops, all subscriptions are
 * unsubscribed and then resubscribed when the connection is restored.
 *
 * @example
 * // Creates an RTM client
 * var rtm = new RTM('YOUR_ENDPOINT', 'YOUR_APPKEY');
 * // create a new subscription to the channel named 'your-channel'
 * var subscription = rtm.subscribe('your-channel');
 *
 * subscription.on('rtm/subscription/data', function (pdu) {
 *     pdu.body.messages.forEach(console.log);
 * });
 * subscription.on('enter-subscribed', function () {
 *     console.log('Subscribed!');
 * });
 * subscription.on('data', function (pdu) {
 *     if (pdu.action.endWith('/error')) {
 *         rtm.restart();
 *     }
 * });
 *
 * @param {string} subscriptionId - unique identifier for the subscription. If you don't use the
 * <code>filter</code> parameter to specify a streamview, subscriptionId is treated as a channel
 * name.
 *
 * @param {Object} _opts - additional subscription options
 *
 * @param {boolean} [_opts.mode] - subscription mode
 *
 * @param {object} [_opts.bodyOpts={}]
 * Additional options for the subscription. These options are sent to RTM in the <code>body</code>
 * element of the PDU that represents the subscribe request. The keys in <code>bodyOpts</code> are
 * documented in the parameter list for [RTM.subscribe()]{@link RTM#subscribe}
 *
 * @throws {TypeError} indicates that mandatory parameters are missing or invalid.
 *
 */
function Subscription(subscriptionId, _opts) {
  if (typeof subscriptionId !== 'string') {
    throw new TypeError('"subscriptionId" is missing or invalid');
  }
  Observer.call(this);

  this.options = objectAssign({}, _opts);
  if (typeof this.options.bodyOpts !== 'object') {
    this.options.bodyOpts = {};
  }
  if (this.options.fastForward) {
    this.options.bodyOpts.fast_forward = true;
  }

  this.position = null;
  this.subscriptionId = subscriptionId;
  this.wasSubscribedAtLeastOnce = false;
  this.isSubscribed = false;
  /* eslint-enable camelcase */
}

Subscription.prototype = Object.create(Observer.prototype);

Subscription.prototype.subscribePdu = function (id) {
  var body;

  // inherit all users options like 'filter', 'fast_forward'
  if (this.options.bodyOpts.filter) {
    body = { subscription_id: this.subscriptionId };
  } else {
    body = { channel: this.subscriptionId };
  }

  body = objectAssign(body, this.options.bodyOpts);

  if (this.wasSubscribedAtLeastOnce) {
    if (this.position !== null) {
      body.position = this.position;
    } else {
      delete body.position;
    }
  }
  return {
    id: id,
    action: 'rtm/subscribe',
    body: body,
  };
};

Subscription.prototype.unsubscribePdu = function (id) {
  return {
    id: id,
    action: 'rtm/unsubscribe',
    body: { subscription_id: this.subscriptionId },
  };
};

Subscription.prototype.onPdu = function (pdu) {
  var body = pdu.body;
  if (body && body.position) {
    this._onPosition(body.position);
  }
  if (pdu.action === 'rtm/subscribe/ok') {
    this.isSubscribed = true;
    this.wasSubscribedAtLeastOnce = true;
    this.fire('enter-subscribed');
  }
  if (pdu.action === 'rtm/unsubscribe/ok') {
    this._markAsUnsubscribed();
  }
  if (pdu.action === 'rtm/subscription/error') {
    this._markAsUnsubscribed();
  }
  this.fire('data', pdu, this);
  this.fire(pdu.action, pdu, this);
};

Subscription.prototype.onDisconnect = function () {
  this._markAsUnsubscribed();
};

Subscription.prototype._onPosition = function (position) {
  if (this.options.trackPosition) {
    this.position = position;
  }
  this.fire('position', position, this);
};

Subscription.prototype._markAsUnsubscribed = function () {
  if (this.isSubscribed) {
    this.isSubscribed = false;
    this.fire('leave-subscribed');
  }
};

module.exports = Subscription;