A RetroSearch Logo

Home - News ( United States | United Kingdom | Italy | Germany ) - Football scores

Search Query:

Showing content from http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Promises/Channel.html below:

Class: Concurrent::Promises::Channel — Concurrent Ruby

Note:

Edge Features are under active development and may change frequently.

A first in first out channel that accepts messages with push family of methods and returns messages with pop family of methods. Pop and push operations can be represented as futures, see #pop_op and #push_op. The capacity of the channel can be limited to support back pressure, use capacity option in #initialize. #pop method blocks ans #pop_op returns pending future if there is no message in the channel. If the capacity is limited the #push method blocks and #push_op returns pending future.

Examples

Let's start by creating a channel with a capacity of 2 messages.

ch = Concurrent::Promises::Channel.new 2

We push 3 messages, then it can be observed that the last thread pushing is sleeping since the channel is full.

threads = Array.new(3) { |i| Thread.new { ch.push message: i } } 
sleep 0.01 threads

When message is popped the last thread continues and finishes as well.

ch.pop                                   threads.map(&:join)

Same principle applies to popping as well. There are now 2 messages int he channel. Lets create 3 threads trying to pop a message, one will be blocked until new messages is pushed.

threads = Array.new(3) { |i| Thread.new { ch.pop } } 
sleep 0.01 threads 
ch.push message: 3
threads.map(&:value)
Promises integration

However this channel is implemented to integrate with promises therefore all operations can be represented as futures.

ch = Concurrent::Promises::Channel.new 2
push_operations = Array.new(3) { |i| ch.push_op message: i }

We do not have to sleep here letting the futures execute as Threads. Since there is capacity for 2 messages the Promises are immediately resolved without ever allocating a Thread to execute. Push and pop operations are often more efficient. The remaining pending push operation will also never require another thread, instead it will resolve when a message is popped from the channel making a space for a new message.

ch.pop_op.value!                         push_operations.map(&:value!)

pop_operations = Array.new(3) { |i| ch.pop_op }
ch.push message: 3 pop_operations.map(&:value) 
Selecting over channels

A selection over channels can be created with the .select_channel factory method. It will be fulfilled with a first message available in any of the channels. It returns a pair to be able to find out which channel had the message available.

ch1    = Concurrent::Promises::Channel.new 2
ch2    = Concurrent::Promises::Channel.new 2
ch1.push 1 
ch2.push 2 

Concurrent::Promises::Channel.select([ch1, ch2])
ch1.select(ch2)

Concurrent::Promises.future { 3 + 4 }.then_channel_push(ch1)
Concurrent::Promises::Channel. 
        select_op([ch1, ch2]).
    then('got number %03d from ch%d') { |(channel, value), format| 
      format format, value, [ch1, ch2].index(channel).succ
    }.value!                             
try_ variants

All blocking operations (#pop, #push, #select) have non-blocking variant with try_ prefix. They always return immediately and indicate either success or failure.

ch
ch.try_push 1                            ch.try_push 2                            ch.try_push 3                            ch.try_pop                               ch.try_pop                               ch.try_pop                               
Timeouts

All blocking operations (#pop, #push, #select) have a timeout option. Similar to try_ variants it will indicate success or timing out, when the timeout option is used.

ch
ch.push 1, 0.01                          ch.push 2, 0.01                          ch.push 3, 0.01                          ch.pop 0.01                              ch.pop 0.01                              ch.pop 0.01                              
Backpressure

Most importantly the channel can be used to create systems with backpressure. A self adjusting system where the producers will slow down if the consumers are not keeping up.

channel = Concurrent::Promises::Channel.new 2
log     = Concurrent::Array.new          
producers = Array.new 2 do |i|
  Thread.new(i) do |i|
    4.times do |j|
      log.push format "producer %d pushing %d", i, j      
      channel.push [i, j]      
    end
  end
end

consumers = Array.new 4 do |i|
  Thread.new(i) do |consumer|
    2.times do |j|
      from, message = channel.pop
      log.push format "consumer %d got %d. payload %d from producer %d", 
                      consumer, j, message, from       
      do_stuff      
    end
  end
end

producers.map(&:join)
consumers.map(&:join)
log

The producers are much faster than consumers (since they do_stuff which takes some time)
but as it can be seen from the log they fill the channel and then they slow down until there is space available in the channel.

If permanent allocation of threads to the producers and consumers has to be avoided, the threads can be replaced with promises that run a thread pool.

channel = Concurrent::Promises::Channel.new 2
log     = Concurrent::Array.new          
def produce(channel, log, producer, i)
  log.push format "producer %d pushing %d", producer, i      
  channel.push_op([producer, i]).then do
    i + 1 < 4 ? produce(channel, log, producer, i + 1) : :done    
  end      
end                                      
def consume(channel, log, consumer, i)
  channel.pop_op.then(consumer, i) do |(from, message), consumer, i|
    log.push format "consumer %d got %d. payload %d from producer %d", 
                    consumer, i, message, from       
    do_stuff
    i + 1 < 2 ? consume(channel, log, consumer, i + 1) : :done       
  end
end                                      
producers = Array.new 2 do |i|
  Concurrent::Promises.future(channel, log, i) { |*args| produce *args, 0 }.run
end

consumers = Array.new 4 do |i|
  Concurrent::Promises.future(channel, log, i) { |*args| consume *args, 0 }.run
end

producers.map(&:value!)                  consumers.map(&:value!)                  log
Synchronization of workers by passing a value

If the capacity of the channel is zero then any push operation will succeed only when there is a matching pop operation which can take the message. The operations have to be paired to succeed.

channel = Concurrent::Promises::Channel.new 0
thread = Thread.new { channel.pop }; sleep 0.01 
thread
channel.try_push(:v1)                    push = channel.push_op(:v2)
thread.value                             channel.pop                              push
Class Method Details .select(channels, timeout = nil) ⇒ ::Array(Channel, Object), nil
322
323
324
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 322

def select(channels, timeout = nil)
  channels.first.select(channels[1..-1], timeout)
end
.select_matching(matcher, channels, timeout = nil) ⇒ ::Array(Channel, Object), nil
340
341
342
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 340

def select_matching(matcher, channels, timeout = nil)
  channels.first.select_matching(matcher, channels[1..-1], timeout)
end
.select_op(channels, probe = Promises.resolvable_future) ⇒ Future(::Array(Channel, Object))
316
317
318
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 316

def select_op(channels, probe = Promises.resolvable_future)
  channels.first.select_op(channels[1..-1], probe)
end
.select_op_matching(matcher, channels, probe = Promises.resolvable_future) ⇒ Future(::Array(Channel, Object))
334
335
336
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 334

def select_op_matching(matcher, channels, probe = Promises.resolvable_future)
  channels.first.select_op_matching(matcher, channels[1..-1], probe)
end
.try_select(channels) ⇒ ::Array(Channel, Object)
310
311
312
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 310

def try_select(channels)
  channels.first.try_select(channels[1..-1])
end
.try_select_matching(matcher, channels) ⇒ ::Array(Channel, Object)
328
329
330
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 328

def try_select_matching(matcher, channels)
  channels.first.try_select_matching(matcher, channels[1..-1])
end
Instance Method Details #capacity ⇒ Integer

Returns Maximum capacity of the Channel.

295
296
297
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 295

def capacity
  @Capacity
end
#peek(no_value = nil) ⇒ Object, no_value

Behaves as #try_pop but it does not remove the message from the channel

209
210
211
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 209

def peek(no_value = nil)
  peek_matching ANY, no_value
end
#peek_matching(matcher, no_value = nil) ⇒ Object, no_value

Behaves as #try_pop but it does not remove the message from the channel

215
216
217
218
219
220
221
222
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 215

def peek_matching(matcher, no_value = nil)
  @Mutex.synchronize do
    message = ns_shift_message matcher, false
    return message if message != NOTHING
    message = ns_consume_pending_push matcher, false
    return message != NOTHING ? message : no_value
  end
end
#pop(timeout = nil, timeout_value = nil) ⇒ Object, nil Note:

This function potentially blocks current thread until it can continue. Be careful it can deadlock.

Blocks current thread until a message is available in the channel for popping.

177
178
179
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 177

def pop(timeout = nil, timeout_value = nil)
  pop_matching ANY, timeout, timeout_value
end
#pop_matching(matcher, timeout = nil, timeout_value = nil) ⇒ Object, nil Note:

This function potentially blocks current thread until it can continue. Be careful it can deadlock.

Blocks current thread until a message is available in the channel for popping.

183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 183

def pop_matching(matcher, timeout = nil, timeout_value = nil)
      probe = @Mutex.synchronize do
    message = ns_shift_message matcher
    if message == NOTHING
      message = ns_consume_pending_push matcher
      return message if message != NOTHING
    else
      new_message = ns_consume_pending_push ANY
      @Messages.push new_message unless new_message == NOTHING
      return message
    end

    probe = Promises.resolvable_future
    @Probes.push probe, false, matcher
    probe
  end

  probe.value!(timeout, timeout_value, [true, timeout_value, nil])
end
#pop_op(probe = Promises.resolvable_future) ⇒ Future(Object)

Returns a future witch will become fulfilled with a value from the channel when one is available. If it is later waited on the operation with a timeout e.g.channel.pop_op.wait(1) it will not prevent the channel to fulfill the operation later after the timeout. The operation has to be either processed later

pop_op = channel.pop_op
if pop_op.wait(1)
  process_message pop_op.value
else
  pop_op.then { |message| log_unprocessed_message message }
end

or the operation can be prevented from completion after timing out by using channel.pop_op.wait(1, [true, nil, nil]). It will fulfill the operation on timeout preventing channel from doing the operation, e.g. popping a message.

160
161
162
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 160

def pop_op(probe = Promises.resolvable_future)
  @Mutex.synchronize { ns_pop_op(ANY, probe, false) }
end
#pop_op_matching(matcher, probe = Promises.resolvable_future) ⇒ Future(Object)

Returns a future witch will become fulfilled with a value from the channel when one is available. If it is later waited on the operation with a timeout e.g.channel.pop_op.wait(1) it will not prevent the channel to fulfill the operation later after the timeout. The operation has to be either processed later

pop_op = channel.pop_op
if pop_op.wait(1)
  process_message pop_op.value
else
  pop_op.then { |message| log_unprocessed_message message }
end

or the operation can be prevented from completion after timing out by using channel.pop_op.wait(1, [true, nil, nil]). It will fulfill the operation on timeout preventing channel from doing the operation, e.g. popping a message.

166
167
168
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 166

def pop_op_matching(matcher, probe = Promises.resolvable_future)
  @Mutex.synchronize { ns_pop_op(matcher, probe, false) }
end
#push(message, timeout = nil) ⇒ self, true, false Note:

This function potentially blocks current thread until it can continue. Be careful it can deadlock.

Blocks current thread until the message is pushed into the channel.

120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 120

def push(message, timeout = nil)
  pushed_op = @Mutex.synchronize do
    return timeout ? true : self if ns_try_push(message)

    pushed = Promises.resolvable_future
        @PendingPush.push message, pushed
    pushed
  end

  result = pushed_op.wait!(timeout, [true, self, nil])
  result == pushed_op ? self : result
end
#push_op(message) ⇒ ResolvableFuture(self)

Returns future which will fulfill when the message is pushed to the channel. If it is later waited on the operation with a timeout e.g.channel.pop_op.wait(1) it will not prevent the channel to fulfill the operation later after the timeout. The operation has to be either processed later

pop_op = channel.pop_op
if pop_op.wait(1)
  process_message pop_op.value
else
  pop_op.then { |message| log_unprocessed_message message }
end

or the operation can be prevented from completion after timing out by using channel.pop_op.wait(1, [true, nil, nil]). It will fulfill the operation on timeout preventing channel from doing the operation, e.g. popping a message.

101
102
103
104
105
106
107
108
109
110
111
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 101

def push_op(message)
  @Mutex.synchronize do
    if ns_try_push(message)
      Promises.fulfilled_future self
    else
      pushed = Promises.resolvable_future
      @PendingPush.push message, pushed
      return pushed
    end
  end
end
#select(channels, timeout = nil) ⇒ ::Array(Channel, Object), nil Note:

This function potentially blocks current thread until it can continue. Be careful it can deadlock.

As #select_op but does not return future, it block current thread instead until there is a message available in the receiver or in any of the channels.

278
279
280
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 278

def select(channels, timeout = nil)
  select_matching ANY, channels, timeout
end
#select_matching(matcher, channels, timeout = nil) ⇒ ::Array(Channel, Object), nil Note:

This function potentially blocks current thread until it can continue. Be careful it can deadlock.

As #select_op but does not return future, it block current thread instead until there is a message available in the receiver or in any of the channels.

284
285
286
287
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 284

def select_matching(matcher, channels, timeout = nil)
  probe = select_op_matching(matcher, channels)
  probe.value!(timeout, nil, [true, nil, nil])
end
#select_op(channels, probe = Promises.resolvable_future) ⇒ ResolvableFuture(::Array(Channel, Object))

When message is available in the receiver or any of the provided channels the future is fulfilled with a channel message pair. The returned channel is the origin of the message. If it is later waited on the operation with a timeout e.g.channel.pop_op.wait(1) it will not prevent the channel to fulfill the operation later after the timeout. The operation has to be either processed later

pop_op = channel.pop_op
if pop_op.wait(1)
  process_message pop_op.value
else
  pop_op.then { |message| log_unprocessed_message message }
end

or the operation can be prevented from completion after timing out by using channel.pop_op.wait(1, [true, nil, nil]). It will fulfill the operation on timeout preventing channel from doing the operation, e.g. popping a message.

257
258
259
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 257

def select_op(channels, probe = Promises.resolvable_future)
  select_op_matching ANY, channels, probe
end
#select_op_matching(matcher, channels, probe = Promises.resolvable_future) ⇒ ResolvableFuture(::Array(Channel, Object))

When message is available in the receiver or any of the provided channels the future is fulfilled with a channel message pair. The returned channel is the origin of the message. If it is later waited on the operation with a timeout e.g.channel.pop_op.wait(1) it will not prevent the channel to fulfill the operation later after the timeout. The operation has to be either processed later

pop_op = channel.pop_op
if pop_op.wait(1)
  process_message pop_op.value
else
  pop_op.then { |message| log_unprocessed_message message }
end

or the operation can be prevented from completion after timing out by using channel.pop_op.wait(1, [true, nil, nil]). It will fulfill the operation on timeout preventing channel from doing the operation, e.g. popping a message.

263
264
265
266
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 263

def select_op_matching(matcher, channels, probe = Promises.resolvable_future)
  [self, *channels].each { |ch| ch.partial_select_op matcher, probe }
  probe
end
#size ⇒ Integer

Returns The number of messages currently stored in the channel.

290
291
292
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 290

def size
  @Mutex.synchronize { @Messages.size }
end
#to_s ⇒ String Also known as: inspect

Returns Short string representation.

300
301
302
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 300

def to_s
  format '%s capacity taken %s of %s>', super[0..-2], size, @Capacity
end
#try_pop(no_value = nil) ⇒ Object, no_value

Pop a message from the channel if there is one available.

138
139
140
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 138

def try_pop(no_value = nil)
  try_pop_matching ANY, no_value
end
#try_pop_matching(matcher, no_value = nil) ⇒ Object, no_value

Pop a message from the channel if there is one available.

145
146
147
148
149
150
151
152
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 145

def try_pop_matching(matcher, no_value = nil)
  @Mutex.synchronize do
    message = ns_shift_message matcher
    return message if message != NOTHING
    message = ns_consume_pending_push matcher
    return message != NOTHING ? message : no_value
  end
end
#try_push(message) ⇒ true, false

Push the message into the channel if there is space available.

77
78
79
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 77

def try_push(message)
  @Mutex.synchronize { ns_try_push(message) }
end
#try_select(channels) ⇒ ::Array(Channel, Object), nil

If message is available in the receiver or any of the provided channels the channel message pair is returned. If there is no message nil is returned. The returned channel is the origin of the message.

232
233
234
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 232

def try_select(channels)
  try_select_matching ANY, channels
end
#try_select_matching(matcher, channels) ⇒ ::Array(Channel, Object), nil

If message is available in the receiver or any of the provided channels the channel message pair is returned. If there is no message nil is returned. The returned channel is the origin of the message.

238
239
240
241
242
243
244
245
# File 'lib/concurrent-ruby-edge/concurrent/edge/channel.rb', line 238

def try_select_matching(matcher, channels)
  message = nil
  channel = [self, *channels].find do |ch|
    message = ch.try_pop_matching(matcher, NOTHING)
    message != NOTHING
  end
  channel ? [channel, message] : nil
end

RetroSearch is an open source project built by @garambo | Open a GitHub Issue

Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo

HTML: 3.2 | Encoding: UTF-8 | Version: 0.7.4