Class: Satori::RTM::Client

Inherits:
Object
  • Object
show all
Includes:
Logger
Defined in:
lib/satori-rtm-sdk/client.rb

Overview

The client that an application uses for accessing RTM.

Constant Summary

Constants included from Logger

Logger::ENV_FLAG

Instance Attribute Summary collapse

I/O collapse

Satori RTM operations collapse

Instance Method Summary collapse

Methods included from Logger

create_logger, default_level, included, logger, use_logger, use_std_logger

Constructor Details

#initialize(endpoint, appkey, opts = {}) ⇒ Client

Returns a new instance of Client.

Parameters:

  • endpoint (String)

    RTM endpoint

  • appkey (String)

    appkey used to access RTM

  • opts (Hash) (defaults to: {})

    options to create a client with.

Options Hash (opts):

  • :transport (WebSocket)

    WebSocket connection implementation to use



23
24
25
26
27
28
29
30
31
# File 'lib/satori-rtm-sdk/client.rb', line 23

def initialize(endpoint, appkey, opts = {})
  @waiters = {}
  @subscriptions = {}
  @url = URI.join(endpoint, 'v2?appkey=' + appkey).to_s
  @id = 0
  @encoder = JsonCodec.new
  @transport = init_transport(opts)
  @state = :init
end

Instance Attribute Details

#transportWebSocket (readonly)

Returns the WebSocket connection

Returns:



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
# File 'lib/satori-rtm-sdk/client.rb', line 10

class Client
  include EventEmitter
  include Logger

  attr_reader :transport
  private :on, :fire

  # Returns a new instance of Client.
  #
  # @param endpoint [String] RTM endpoint
  # @param appkey [String] appkey used to access RTM
  # @param opts [Hash] options to create a client with.
  # @option opts [WebSocket] :transport WebSocket connection implementation to use
  def initialize(endpoint, appkey, opts = {})
    @waiters = {}
    @subscriptions = {}
    @url = URI.join(endpoint, 'v2?appkey=' + appkey).to_s
    @id = 0
    @encoder = JsonCodec.new
    @transport = init_transport(opts)
    @state = :init
  end

  # @!group I/O

  # Connects to Satori RTM.
  #
  # @raise [ConnectionError] network error occurred when connecting
  # @return [void]
  def connect
    raise ConnectionError, "Client is a single-use object. You can't connect twice." if @state != :init
    logger.info "connecting to #{@url}"
    @transport.connect(@url)
  rescue => e
    # generate websocket close event
    on_websocket_close(1006, "Connect exception: #{e.message}")
    raise e
  end

  # Defines a callback to call when the client is successfully connected to Satori RTM.
  #
  # @yield Calls when client is connected
  # @return [void]
  def onopen(&fn)
    on :open, &fn
  end

  # Defines a callback to call when the client is disconnected or not able to connect to Satori RTM.
  #
  # If a client is not able to connect, a +onclose+ yields too with a reason.
  #
  # @yield Calls when client is disconnected or not able to connect
  # @yieldparam close_event [CloseEvent] reason why client was closed
  # @return [void]
  def onclose(&fn)
    on :close, &fn
  end

  # Returns +true+ if the client is connected.
  #
  # @return [Boolean] +true+ if connected, +false+ otherwise
  def connected?
    @state == :open
  end

  # Closes gracefully the connection to Satori RTM.
  # @return [void]
  def close
    @transport.close
  end

  # Reads from an WebSocket with an optional timeout.
  #
  # If timeout is greater than zero, it specifies a maximum interval (in seconds)
  # to wait for any incoming WebSocket frames. If timeout is zero,
  # then the method returns without blocking. If the timeout is less
  # than zero, the method blocks indefinitely.
  #
  # @param opts [Hash] additional options
  # @option opts [Integer] :timeout_in_secs (-1) timeout for a read operation
  #
  # @raise [ConnectionError] network error occurred when reading data
  # @return [:ok] read successfully reads data from WebSocket
  # @return [:timeout] a blocking operation times out
  def sock_read(opts = {})
    timeout_in_secs = opts.fetch(:timeout_in_secs, -1)
    if timeout_in_secs >= 0
      @transport.read_with_timeout(timeout_in_secs)
    else
      @transport.read
    end
  end

  # Reads from an WebSocket in a non-blocking mode.
  #
  # @raise [ConnectionError] network error occurred when reading data
  # @return [:ok] read successfully reads data from WebSocket
  # @return [:would_block] read buffer is empty
  def sock_read_nonblock
    @transport.read_nonblock
  end

  # Reads repeatedly from an WebSocket.
  #
  # This method repeatedly reads from an WebSocket during a specified
  # time (in seconds). If duration time is greater then zero, then the
  # method blocks for the duration time and reads repeatedly all
  # incoming WebSocket frames. If duration time is less than zero, the
  # method blocks indefinitely.
  #
  # @param opts [Hash] additional options
  # @option opts [Integer] :duration_in_secs (-1) duration interval
  #
  # @raise [ConnectionError] network error occurred when reading data
  # @return [void]
  def sock_read_repeatedly(opts = {})
    duration_in_secs = opts.fetch(:duration_in_secs, -1)
    start = Time.now
    loop do
      diff = (Time.now - start)
      break if (duration_in_secs >= 0) && (duration_in_secs <= diff)
      @transport.read_with_timeout(duration_in_secs - diff)
    end
  end

  # Wait for all RTM replies for all pending requests.
  #
  # This method blocks until all RTM replies are received for all
  # pending requests.
  #
  # If timeout is greater than zero, it specifies a maximum interval (in seconds)
  # to wait for any incoming WebSocket frames.  If the timeout is less
  # than zero, the method blocks indefinitely.
  #
  # @note if user's callback for a reply sends new RTM request
  # then this method waits it too.
  #
  # @param opts [Hash] additional options
  # @option opts [Integer] :timeout_in_secs (-1) timeout for an operation
  #
  # @raise [ConnectionError] network error occurred when reading data
  #
  # @return [:ok] all replies are received
  # @return [:timeout] a blocking operation times out
  def wait_all_replies(opts = {})
    timeout_in_secs = opts.fetch(:timeout_in_secs, -1)
    start = Time.now
    rc = :ok
    loop do
      break if @waiters.empty?

      if timeout_in_secs >= 0
        diff = (Time.now - start)
        if timeout_in_secs <= diff
          rc = :timeout
          break
        end
        @transport.read_with_timeout(timeout_in_secs - diff)
      else
        @transport.read_with_timeout(1)
      end
    end
    rc
  end

  # @!endgroup

  # @!group Satori RTM operations

  # Publishes a message to a channel.
  #
  # @param channel [String] name of the channel
  # @param message [Object] message to publish
  # @yield Callback for an RTM reply. If the block is not given, then
  #   no reply will be sent to a client, regardless of the outcome
  # @yieldparam reply [BaseReply] RTM reply for delete request
  # @return [void]
  def publish(channel, message, &fn)
    publish_opts = {
      channel: channel,
      message: message
    }
    send_r('rtm/publish', publish_opts, &fn)
  end

  # Reads a message in a channel.
  #
  # RTM returns the message at the position specified in the request.
  # If there is no position specified, RTM defaults to the position of
  # the latest message in the channel. A +null+ message in the reply
  # PDU means that there were no messages at that position.
  #
  # @param channel [String] name of the channel
  # @param opts [Hash] additional options for +rtm/read+ request
  # @yield Callback for an RTM reply. If the block is not given, then
  #   no reply will be sent to a client, regardless of the outcome
  # @yieldparam reply [BaseReply] RTM reply for delete request
  # @return [void]
  def read(channel, opts = {}, &fn)
    read_opts = opts.merge channel: channel
    send_r('rtm/read', read_opts, &fn)
  end

  # Writes the value of the specified key from the key-value store.
  #
  # Key is represented by a channel. In current RTM implementation
  # write operation is the same as publish operation.
  #
  # @param channel [String] name of the channel
  # @param message [Object] message to write
  # @yield Callback for an RTM reply. If the block is not given, then
  #   no reply will be sent to a client, regardless of the outcome
  # @yieldparam reply [BaseReply] RTM reply for delete request
  # @return [void]
  def write(channel, message, &fn)
    write_opts = {
      channel: channel,
      message: message
    }
    send_r('rtm/write', write_opts, &fn)
  end

  # Deletes the value of the specified key from the key-value store.
  #
  # Key is represented by a channel, and only the last message in the
  # channel is relevant (represents the value). Hence, publishing a +null+
  # value, serves as deletion of the the previous value (if any). Delete request
  # is the same as publishing or writing a null value to the channel.
  #
  # @param channel [String] name of the channel
  # @yield Callback for an RTM reply. If the block is not given, then
  #   no reply will be sent to a client, regardless of the outcome
  # @yieldparam reply [BaseReply] RTM reply for delete request
  # @return [void]
  def delete(channel, &fn)
    delete_opts = { channel: channel }
    send_r('rtm/delete', delete_opts, &fn)
  end

  # Subscribes to a channel
  #
  # When you create a subscription, you can specify additional subscription options (e.g. history or view).
  # Full list of subscription option you could find in RTM API specification.
  #
  # Satori SDK informs an user about any subscription state changes by calling block with proper event.
  #
  # @see SubscriptionEvent Information about subscription events
  #
  # @param sid [String] subscription id
  # @param opts [Hash] additional options for +rtm/subscribe+ request
  # @yield RTM subscription callback
  # @yieldparam ctx [SubscriptionContext] current subscription context
  # @yieldparam event [SubscriptionEvent] subscription event
  # @return [void]
  #
  # @example
  #   client.subscribe 'animals' do |_ctx, event|
  #     case event.type
  #     when :subscribed
  #       puts "Subscribed to the channel: #{event.data[:subscription_id]}"
  #     when :data
  #       event.data[:messages].each { |msg| puts "Message is received #{msg}" }
  #     when :error
  #       puts "Subscription error: #{event.data[:error]} -- #{event.data[:reason]}"
  #     end
  #   end
  def subscribe(sid, opts = {}, &fn)
    request_opts = opts.merge subscription_id: sid
    request_opts[:channel] = sid unless %i[filter view].any? { |k| opts.key?(k) }

    context = SubscriptionContext.new(sid, opts, fn)

    init_reply = SubscriptionEvent.new(:init, nil)
    context.fn.call(context, init_reply)

    send('rtm/subscribe', request_opts) do |status, data|
      reply = context.handle_data(status, data)

      if @subscriptions.key?(sid) && @subscriptions[sid] != context
        prev_sub = @subscriptions[sid]
        prev_sub.mark_as_resubscribed
      end

      @subscriptions[sid] = context if reply.type == :subscribed

      context.fn.call(context, reply)
    end
  end

  # Unsubscribes the subscription with the specific +subscription_id+
  #
  # @param sid [String] subscription id
  # @yield Callback for an RTM reply
  # @yieldparam reply [BaseReply] RTM reply for authenticate request
  # @return [void]
  def unsubscribe(sid)
    request_opts = { subscription_id: sid }
    send('rtm/unsubscribe', request_opts) do |status, data|
      context = @subscriptions.delete(sid)
      if context
        reply = context.handle_data(status, data)
        context.fn.call(context, reply)
      end
      # pass base reply to unsubscribe block
      yield(BaseReply.new(status, data)) if block_given?
    end
  end

  # Authenticates a user with specific role and secret.
  #
  # Authentication is based on the +HMAC+ algorithm with +MD5+ hashing routine:
  # * The SDK obtains a nonce from the RTM in a handshake request
  # * The SDK then sends an authorization request with its role secret
  #   key hashed with the received nonce
  #
  # If authentication is failed then reason is passed to the yield block. In
  # case of success the +rtm/authenticate+ reply is passed to the yield block.
  #
  # Use Dev Portal to obtain the role and secret key for your application.
  #
  # @param role [String] role name
  # @param secret [String] role secret
  # @yield Callback for an RTM reply
  # @yieldparam reply [BaseReply] RTM reply for authenticate request
  # @return [void]
  #
  # @example
  #   client.authenticate role, role_secret do |reply|
  #     raise "Failed to authenticate: #{reply.data[:error]} -- #{reply.data[:reason]}" unless reply.success?
  #   end
  #   client.wait_all_replies
  def authenticate(role, secret)
    handshake_opts = {
      method: 'role_secret',
      data: { role: role }
    }
    send_r('auth/handshake', handshake_opts) do |reply|
      if reply.success?
        hash = hmac_md5(reply.data[:data][:nonce], secret)
        authenticate_opts = {
          method: 'role_secret',
          credentials: { hash: hash }
        }
        send_r('auth/authenticate', authenticate_opts) do |auth_reply|
          yield(auth_reply)
        end
      else
        yield(reply)
      end
    end
  end

  # @!endgroup

  private

  def pdu_to_reply_adapter(fn)
    return if fn.nil?

    proc do |status, data|
      reply = BaseReply.new(status, data)
      fn.call(reply)
    end
  end

  def send_r(action, body, &fn)
    send(action, body, &pdu_to_reply_adapter(fn))
  end

  def send(action, body, &block)
    pdu = { action: action, body: body }
    if block_given?
      pdu[:id] = gen_next_id
      @waiters[pdu[:id]] = block
    end
    logger.debug("-> #{pdu}")
    data = @encoder.encode(pdu)
    @transport.send(data, type: :text)
  end

  def gen_next_id
    @id += 1
  end

  def on_websocket_open
    logger.info('connection is opened')
    @state = :open
    fire(:open)
  end

  def on_websocket_close(code, reason)
    return if @state == :close
    @state = :close
    is_normal = (code == 1000)
    if is_normal
      logger.info('connection is closed normally')
    else
      logger.warn("connection is closed with code: '#{code}' -- '#{reason}'")
    end

    pass_disconnect_to_all_callbacks

    @waiters = {}
    @subscriptions = {}

    fire(:close, CloseEvent.new(code, reason))
  end

  def pass_disconnect_to_all_callbacks
    err = { error: 'disconnect', reason: 'Connection is closed' }

    @waiters.sort_by(&:first).map do |_, fn|
      safe_call(fn, :disconnect, err)
    end
    @subscriptions.map do |_, context|
      reply = context.handle_data(:disconnect, err)
      safe_call(context.fn, context, reply)
    end
  end

  def on_websocket_message(data, _type)
    pdu = @encoder.decode(data)
    logger.debug("<- #{pdu}")
    id = pdu[:id]
    if id.nil?
      on_unsolicited_pdu(pdu)
    else
      fn = @waiters.delete(id)
      fn.call(:pdu, pdu) unless fn.nil?
    end
  end

  def on_unsolicited_pdu(pdu)
    if pdu[:action] == '/error'
      reason = "Unclassified RTM error is received: #{pdu[:body][:error]} -- #{pdu[:body][:reason]}"
      @transport.close 1008, reason
    elsif pdu[:action].start_with? 'rtm/subscription'
      sid = pdu[:body][:subscription_id]
      context = @subscriptions[sid]
      if context
        reply = context.handle_data(:pdu, pdu)
        context.fn.call(context, reply)
      end
    end
  end

  def hmac_md5(nonce, secret)
    algorithm = OpenSSL::Digest.new('md5')
    digest = OpenSSL::HMAC.digest(algorithm, secret, nonce)
    Base64.encode64(digest).chomp
  end

  def init_transport(opts)
    transport = opts[:transport] || WebSocket.new
    transport.on(:open, &method(:on_websocket_open))
    transport.on(:message, &method(:on_websocket_message))
    transport.on(:close, &method(:on_websocket_close))
    transport
  end

  def safe_call(fn, *args)
    fn.call(*args)
  rescue => e
    logger.error(e)
  end
end

Instance Method Details

#authenticate(role, secret) {|reply| ... } ⇒ void

This method returns an undefined value.

Authenticates a user with specific role and secret.

Authentication is based on the HMAC algorithm with MD5 hashing routine:

  • The SDK obtains a nonce from the RTM in a handshake request

  • The SDK then sends an authorization request with its role secret key hashed with the received nonce

If authentication is failed then reason is passed to the yield block. In case of success the rtm/authenticate reply is passed to the yield block.

Use Dev Portal to obtain the role and secret key for your application.

Examples:

client.authenticate role, role_secret do |reply|
  raise "Failed to authenticate: #{reply.data[:error]} -- #{reply.data[:reason]}" unless reply.success?
end
client.wait_all_replies

Parameters:

  • role (String)

    role name

  • secret (String)

    role secret

Yields:

  • Callback for an RTM reply

Yield Parameters:

  • reply (BaseReply)

    RTM reply for authenticate request



341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
# File 'lib/satori-rtm-sdk/client.rb', line 341

def authenticate(role, secret)
  handshake_opts = {
    method: 'role_secret',
    data: { role: role }
  }
  send_r('auth/handshake', handshake_opts) do |reply|
    if reply.success?
      hash = hmac_md5(reply.data[:data][:nonce], secret)
      authenticate_opts = {
        method: 'role_secret',
        credentials: { hash: hash }
      }
      send_r('auth/authenticate', authenticate_opts) do |auth_reply|
        yield(auth_reply)
      end
    else
      yield(reply)
    end
  end
end

#closevoid

This method returns an undefined value.

Closes gracefully the connection to Satori RTM.



77
78
79
# File 'lib/satori-rtm-sdk/client.rb', line 77

def close
  @transport.close
end

#connectvoid

This method returns an undefined value.

Connects to Satori RTM.

Raises:



39
40
41
42
43
44
45
46
47
# File 'lib/satori-rtm-sdk/client.rb', line 39

def connect
  raise ConnectionError, "Client is a single-use object. You can't connect twice." if @state != :init
  logger.info "connecting to #{@url}"
  @transport.connect(@url)
rescue => e
  # generate websocket close event
  on_websocket_close(1006, "Connect exception: #{e.message}")
  raise e
end

#connected?Boolean

Returns true if the client is connected.

Returns:

  • (Boolean)

    true if connected, false otherwise



71
72
73
# File 'lib/satori-rtm-sdk/client.rb', line 71

def connected?
  @state == :open
end

#delete(channel) {|reply| ... } ⇒ void

This method returns an undefined value.

Deletes the value of the specified key from the key-value store.

Key is represented by a channel, and only the last message in the channel is relevant (represents the value). Hence, publishing a null value, serves as deletion of the the previous value (if any). Delete request is the same as publishing or writing a null value to the channel.

Parameters:

  • channel (String)

    name of the channel

Yields:

  • Callback for an RTM reply. If the block is not given, then no reply will be sent to a client, regardless of the outcome

Yield Parameters:

  • reply (BaseReply)

    RTM reply for delete request



244
245
246
247
# File 'lib/satori-rtm-sdk/client.rb', line 244

def delete(channel, &fn)
  delete_opts = { channel: channel }
  send_r('rtm/delete', delete_opts, &fn)
end

#onclose {|close_event| ... } ⇒ void

This method returns an undefined value.

Defines a callback to call when the client is disconnected or not able to connect to Satori RTM.

If a client is not able to connect, a onclose yields too with a reason.

Yields:

  • Calls when client is disconnected or not able to connect

Yield Parameters:

  • close_event (CloseEvent)

    reason why client was closed



64
65
66
# File 'lib/satori-rtm-sdk/client.rb', line 64

def onclose(&fn)
  on :close, &fn
end

#onopen { ... } ⇒ void

This method returns an undefined value.

Defines a callback to call when the client is successfully connected to Satori RTM.

Yields:

  • Calls when client is connected



53
54
55
# File 'lib/satori-rtm-sdk/client.rb', line 53

def onopen(&fn)
  on :open, &fn
end

#publish(channel, message) {|reply| ... } ⇒ void

This method returns an undefined value.

Publishes a message to a channel.

Parameters:

  • channel (String)

    name of the channel

  • message (Object)

    message to publish

Yields:

  • Callback for an RTM reply. If the block is not given, then no reply will be sent to a client, regardless of the outcome

Yield Parameters:

  • reply (BaseReply)

    RTM reply for delete request



187
188
189
190
191
192
193
# File 'lib/satori-rtm-sdk/client.rb', line 187

def publish(channel, message, &fn)
  publish_opts = {
    channel: channel,
    message: message
  }
  send_r('rtm/publish', publish_opts, &fn)
end

#read(channel, opts = {}) {|reply| ... } ⇒ void

This method returns an undefined value.

Reads a message in a channel.

RTM returns the message at the position specified in the request. If there is no position specified, RTM defaults to the position of the latest message in the channel. A null message in the reply PDU means that there were no messages at that position.

Parameters:

  • channel (String)

    name of the channel

  • opts (Hash) (defaults to: {})

    additional options for rtm/read request

Yields:

  • Callback for an RTM reply. If the block is not given, then no reply will be sent to a client, regardless of the outcome

Yield Parameters:

  • reply (BaseReply)

    RTM reply for delete request



208
209
210
211
# File 'lib/satori-rtm-sdk/client.rb', line 208

def read(channel, opts = {}, &fn)
  read_opts = opts.merge channel: channel
  send_r('rtm/read', read_opts, &fn)
end

#sock_read(opts = {}) ⇒ :ok, :timeout

Reads from an WebSocket with an optional timeout.

If timeout is greater than zero, it specifies a maximum interval (in seconds) to wait for any incoming WebSocket frames. If timeout is zero, then the method returns without blocking. If the timeout is less than zero, the method blocks indefinitely.

Parameters:

  • opts (Hash) (defaults to: {})

    additional options

Options Hash (opts):

  • :timeout_in_secs (Integer) — default: -1

    timeout for a read operation

Returns:

  • (:ok)

    read successfully reads data from WebSocket

  • (:timeout)

    a blocking operation times out

Raises:



94
95
96
97
98
99
100
101
# File 'lib/satori-rtm-sdk/client.rb', line 94

def sock_read(opts = {})
  timeout_in_secs = opts.fetch(:timeout_in_secs, -1)
  if timeout_in_secs >= 0
    @transport.read_with_timeout(timeout_in_secs)
  else
    @transport.read
  end
end

#sock_read_nonblock:ok, :would_block

Reads from an WebSocket in a non-blocking mode.

Returns:

  • (:ok)

    read successfully reads data from WebSocket

  • (:would_block)

    read buffer is empty

Raises:



108
109
110
# File 'lib/satori-rtm-sdk/client.rb', line 108

def sock_read_nonblock
  @transport.read_nonblock
end

#sock_read_repeatedly(opts = {}) ⇒ void

This method returns an undefined value.

Reads repeatedly from an WebSocket.

This method repeatedly reads from an WebSocket during a specified time (in seconds). If duration time is greater then zero, then the method blocks for the duration time and reads repeatedly all incoming WebSocket frames. If duration time is less than zero, the method blocks indefinitely.

Parameters:

  • opts (Hash) (defaults to: {})

    additional options

Options Hash (opts):

  • :duration_in_secs (Integer) — default: -1

    duration interval

Raises:



125
126
127
128
129
130
131
132
133
# File 'lib/satori-rtm-sdk/client.rb', line 125

def sock_read_repeatedly(opts = {})
  duration_in_secs = opts.fetch(:duration_in_secs, -1)
  start = Time.now
  loop do
    diff = (Time.now - start)
    break if (duration_in_secs >= 0) && (duration_in_secs <= diff)
    @transport.read_with_timeout(duration_in_secs - diff)
  end
end

#subscribe(sid, opts = {}) {|ctx, event| ... } ⇒ void

This method returns an undefined value.

Subscribes to a channel

When you create a subscription, you can specify additional subscription options (e.g. history or view). Full list of subscription option you could find in RTM API specification.

Satori SDK informs an user about any subscription state changes by calling block with proper event.

Examples:

client.subscribe 'animals' do |_ctx, event|
  case event.type
  when :subscribed
    puts "Subscribed to the channel: #{event.data[:subscription_id]}"
  when :data
    event.data[:messages].each { |msg| puts "Message is received #{msg}" }
  when :error
    puts "Subscription error: #{event.data[:error]} -- #{event.data[:reason]}"
  end
end

Parameters:

  • sid (String)

    subscription id

  • opts (Hash) (defaults to: {})

    additional options for rtm/subscribe request

Yields:

  • RTM subscription callback

Yield Parameters:

See Also:



276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
# File 'lib/satori-rtm-sdk/client.rb', line 276

def subscribe(sid, opts = {}, &fn)
  request_opts = opts.merge subscription_id: sid
  request_opts[:channel] = sid unless %i[filter view].any? { |k| opts.key?(k) }

  context = SubscriptionContext.new(sid, opts, fn)

  init_reply = SubscriptionEvent.new(:init, nil)
  context.fn.call(context, init_reply)

  send('rtm/subscribe', request_opts) do |status, data|
    reply = context.handle_data(status, data)

    if @subscriptions.key?(sid) && @subscriptions[sid] != context
      prev_sub = @subscriptions[sid]
      prev_sub.mark_as_resubscribed
    end

    @subscriptions[sid] = context if reply.type == :subscribed

    context.fn.call(context, reply)
  end
end

#unsubscribe(sid) {|reply| ... } ⇒ void

This method returns an undefined value.

Unsubscribes the subscription with the specific subscription_id

Parameters:

  • sid (String)

    subscription id

Yields:

  • Callback for an RTM reply

Yield Parameters:

  • reply (BaseReply)

    RTM reply for authenticate request



305
306
307
308
309
310
311
312
313
314
315
316
# File 'lib/satori-rtm-sdk/client.rb', line 305

def unsubscribe(sid)
  request_opts = { subscription_id: sid }
  send('rtm/unsubscribe', request_opts) do |status, data|
    context = @subscriptions.delete(sid)
    if context
      reply = context.handle_data(status, data)
      context.fn.call(context, reply)
    end
    # pass base reply to unsubscribe block
    yield(BaseReply.new(status, data)) if block_given?
  end
end

#wait_all_replies(opts = {}) ⇒ :ok, :timeout

Note:

if user's callback for a reply sends new RTM request

Wait for all RTM replies for all pending requests.

This method blocks until all RTM replies are received for all pending requests.

If timeout is greater than zero, it specifies a maximum interval (in seconds) to wait for any incoming WebSocket frames. If the timeout is less than zero, the method blocks indefinitely.

then this method waits it too.

Parameters:

  • opts (Hash) (defaults to: {})

    additional options

Options Hash (opts):

  • :timeout_in_secs (Integer) — default: -1

    timeout for an operation

Returns:

  • (:ok)

    all replies are received

  • (:timeout)

    a blocking operation times out

Raises:



154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/satori-rtm-sdk/client.rb', line 154

def wait_all_replies(opts = {})
  timeout_in_secs = opts.fetch(:timeout_in_secs, -1)
  start = Time.now
  rc = :ok
  loop do
    break if @waiters.empty?

    if timeout_in_secs >= 0
      diff = (Time.now - start)
      if timeout_in_secs <= diff
        rc = :timeout
        break
      end
      @transport.read_with_timeout(timeout_in_secs - diff)
    else
      @transport.read_with_timeout(1)
    end
  end
  rc
end

#write(channel, message) {|reply| ... } ⇒ void

This method returns an undefined value.

Writes the value of the specified key from the key-value store.

Key is represented by a channel. In current RTM implementation write operation is the same as publish operation.

Parameters:

  • channel (String)

    name of the channel

  • message (Object)

    message to write

Yields:

  • Callback for an RTM reply. If the block is not given, then no reply will be sent to a client, regardless of the outcome

Yield Parameters:

  • reply (BaseReply)

    RTM reply for delete request



224
225
226
227
228
229
230
# File 'lib/satori-rtm-sdk/client.rb', line 224

def write(channel, message, &fn)
  write_opts = {
    channel: channel,
    message: message
  }
  send_r('rtm/write', write_opts, &fn)
end