Edge Features are under active development and may change frequently.
concurrent-ruby-edge
are expected to move to concurrent-ruby
when finalised.A tool managing concurrency level of tasks. The maximum capacity is set in constructor. Each acquire will lower the available capacity and release will increase it. When there is no available capacity the current thread may either be blocked or an event is returned which will be resolved when capacity becomes available.
The more common usage of the Throttle is with a proxy executor a_throttle.on(Concurrent.global_io_executor)
. Anything executed on the proxy executor will be throttled and execute on the given executor. There can be more than one proxy executors. All abstractions which execute tasks have option to specify executor, therefore the proxy executor can be injected to any abstraction throttling its concurrency level.
Limiting concurrency level of a concurrently executed block to two
max_two = Concurrent::Throttle.new 2
concurrency_level = Concurrent::AtomicFixnum.new
job = -> do
concurrency_level.increment
do_stuff
current_concurrency_level = concurrency_level.value
concurrency_level.decrement
current_concurrency_level
end
Array.new(10) do
Thread.new do
max_two.acquire(&job)
end
end.map(&:value)
Notice that the returned array has no number bigger than 2 therefore the concurrency level of the block with the do_stuff
was never bigger than 2.
def monitor_concurrency_level(concurrency_level, &block)
concurrency_level.increment
block.call
current_concurrency_level = concurrency_level.value
concurrency_level.decrement
return current_concurrency_level
end
throttle = Concurrent::Throttle.new 3
concurrency_level = Concurrent::AtomicFixnum.new
Array.new(10) do |i|
throttle.future(i) do |arg|
monitor_concurrency_level(concurrency_level) { do_stuff arg }
end
end.map(&:value!)
The concurrency level does not rise above 3.
It works by setting the executor of the future created from the throttle. The executor is a proxy executor for the Concurrent::Promises.default_executor
which can be obtained using #on method. Therefore the above example could be instead more explicitly written as follows
Array.new(10) do |i|
Concurrent::Promises.future_on(throttle.on(Concurrent::Promises.default_executor)) do
end
end.map(&:value!)
Anything executed on the proxy executor is throttled. A throttle can have more proxy executors for different executors, all jobs share the same capacity provided by the throttle.
Since the proxy executor becomes the executor of the future, any chained futures will also be throttled. It can be changed by using different executor. It the following example the first 2 futures in the chain are throttled, the last is not.
concurrency_level_throttled = Concurrent::AtomicFixnum.new
concurrency_level_unthrottled = Concurrent::AtomicFixnum.new
Array.new(10) do |i|
throttle.future(i) do
monitor_concurrency_level(concurrency_level_throttled) { do_stuff }
end.then do |v|
[v, monitor_concurrency_level(concurrency_level_throttled) { do_stuff }]
end.then_on(:io) do |l1, l2|
[l1, l2, monitor_concurrency_level(concurrency_level_unthrottled) { 5.times { do_stuff } }]
end
end.map(&:value!)
In the output you can see that the first 2 columns do not cross the 3 capacity limit and the last column which is untroubled does.
TODO (pitr-ch 20-Dec-2018): example with virtual throttled executor, throttling only part of promises chain.
Other abstraction
The proxy executor created with throttle can be used with other abstractions as well and combined.
concurrency_level = Concurrent::AtomicFixnum.new
futures = Array.new(5) do |i|
throttle.future(i) do |arg|
monitor_concurrency_level(concurrency_level) { do_stuff arg }
end
end
agents = Array.new(5) do |i|
agent = Concurrent::Agent.new 0
agent.send_via(throttle.on(:io)) { monitor_concurrency_level(concurrency_level_throttled) { do_stuff } }
agent
end
futures.map(&:value!) agents.each { |a| a.await }.map(&:value)
There is no observed concurrency level above 3.
Constructor Details #initialize(capacity) ⇒ Throttle37 38 39 40 41 42 43
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 37 def initialize(capacity) super() @MaxCapacity = capacity @Queue = LockFreeQueue.new @executor_cache = [nil, nil] self.capacity = capacity endInstance Method Details #acquire(timeout = nil) { ... } ⇒ Object, self, true, false
Blocks current thread until there is capacity available in the throttle. The acquired capacity has to be returned to the throttle by calling #release. If block is passed then the block is called after the capacity is acquired and it is automatically released after the block is executed.
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 63 def acquire(timeout = nil, &block) event = acquire_or_event if event within_timeout = event.wait(timeout) event.on_resolution!(self, &:release) unless within_timeout else within_timeout = true end called = false if timeout if block if within_timeout called = true block.call else nil end else within_timeout end else if block called = true block.call else self end end ensure release if called end#available_capacity ⇒ Integer
Returns The available capacity.
30 31 32 33
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 30 def available_capacity current_capacity = capacity current_capacity >= 0 ? current_capacity : 0 end#default_executor ⇒ ExecutorService
183 184 185
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 183 def default_executor on(super) end#max_capacity ⇒ Integer
Returns The maximum capacity.
46 47 48
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 46 def max_capacity @MaxCapacity end#on(executor = Promises::FactoryMethods.default_executor) ⇒ ExecutorService
Returns An executor which wraps given executor and allows to post tasks only as available capacity in the throttle allows.
162 163 164 165 166 167 168 169 170 171 172 173 174 175
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 162 def on(executor = Promises::FactoryMethods.default_executor) current_executor, current_cache = @executor_cache return current_cache if current_executor == executor && current_cache if current_executor.nil? proxy_executor = ProxyExecutor.new(self, Concurrent.executor(executor)) @executor_cache = [executor, proxy_executor] return proxy_executor else ProxyExecutor.new(self, Concurrent.executor(executor)) end end#release ⇒ self
Releases previously acquired capacity back to Throttle. Has to be called exactly once for each acquired capacity.
118 119 120 121 122 123 124 125 126 127 128 129 130
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 118 def release while true current_capacity = capacity if compare_and_set_capacity current_capacity, current_capacity + 1 if current_capacity < 0 Thread.pass until (trigger = @Queue.pop) trigger.resolve end return self end end end#to_s ⇒ String Also known as: inspect
Returns Short string representation.
133 134 135
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 133 def to_s format '%s capacity available %d of %d>', super[0..-2], capacity, @MaxCapacity end#try_acquire ⇒ true, false
Tries to acquire capacity from the throttle. Returns true when there is capacity available. The acquired capacity has to be returned to the throttle by calling #release.
102 103 104 105 106 107 108 109 110 111 112
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 102 def try_acquire while true current_capacity = capacity if current_capacity > 0 return true if compare_and_set_capacity( current_capacity, current_capacity - 1) else return false end end end#any_event_on(default_executor, *futures_and_or_events) ⇒ Event Originally defined in module Promises::FactoryMethods
Creates a new event which becomes resolved after the first futures_and_or_events resolves. If resolved it does not propagate AbstractEventFuture#touch, leaving delayed futures un-executed if they are not required any more.
#any_fulfilled_future_on(default_executor, *futures_and_or_events) ⇒ Future Originally defined in module Promises::FactoryMethodsCreates a new future which is resolved after the first futures_and_or_events is fulfilled. Its result equals the result of the first resolved future or if all futures_and_or_events reject, it has reason of the last rejected future. If resolved it does not propagate AbstractEventFuture#touch, leaving delayed futures un-executed if they are not required any more. If event is supplied, which does not have value and can be only resolved, it's represented as :fulfilled
with value nil
.
Creates a new future which is resolved after the first futures_and_or_events is resolved. Its result equals the result of the first resolved future. If resolved it does not propagate AbstractEventFuture#touch, leaving delayed futures un-executed if they are not required any more. If event is supplied, which does not have value and can be only resolved, it's represented as :fulfilled
with value nil
.
Creates a new event or future which is resolved only after it is touched, see AbstractEventFuture#touch.
#fulfilled_future(value, default_executor = self.default_executor) ⇒ Future Originally defined in module Promises::FactoryMethodsCreates a resolved future which will be fulfilled with the given value.
#future_on(default_executor, *args) {|*args| ... } ⇒ Future Originally defined in module Promises::FactoryMethodsConstructs a new Future which will be resolved after block is evaluated on default executor. Evaluation begins immediately.
#make_future(nil, default_executor = self.default_executor) ⇒ Event #make_future(a_future, default_executor = self.default_executor) ⇒ Future #make_future(an_event, default_executor = self.default_executor) ⇒ Event #make_future(exception, default_executor = self.default_executor) ⇒ Future #make_future(value, default_executor = self.default_executor) ⇒ Future Originally defined in module Promises::FactoryMethodsGeneral constructor. Behaves differently based on the argument's type. It's provided for convenience but it's better to be explicit.
#rejected_future(reason, default_executor = self.default_executor) ⇒ Future Originally defined in module Promises::FactoryMethodsCreates a resolved future which will be rejected with the given reason.
#resolved_future(fulfilled, value, reason, default_executor = self.default_executor) ⇒ Future Originally defined in module Promises::FactoryMethodsCreates a resolved future with will be either fulfilled with the given value or rejected with the given reason.
#schedule_on(default_executor, intended_time, *args) {|*args| ... } ⇒ Future #schedule_on(default_executor, intended_time) ⇒ Event Originally defined in module Promises::FactoryMethodsCreates a new event or future which is resolved in intended_time.
#zip_events_on(default_executor, *futures_and_or_events) ⇒ Event Originally defined in module Promises::FactoryMethodsCreates a new event which is resolved after all futures_and_or_events are resolved. (Future is resolved when fulfilled or rejected.)
#zip_futures_on(default_executor, *futures_and_or_events) ⇒ Future Originally defined in module Promises::FactoryMethodsCreates a new future which is resolved after all futures_and_or_events are resolved. Its value is an array of zipped future values. Its reason is an array of reasons for rejection. If there is an error it rejects. If event is supplied, which does not have value and can be only resolved, it's represented as :fulfilled
with value nil
.
Edge Features are under active development and may change frequently.
concurrent-ruby-edge
are expected to move to concurrent-ruby
when finalised.Shortcut of #zip_futures_over_on with default :io
executor supplied.
Edge Features are under active development and may change frequently.
concurrent-ruby-edge
are expected to move to concurrent-ruby
when finalised.Creates new future which is resolved after all the futures created by future_factory from enumerable elements are resolved. Simplified it does: zip(*enumerable.map { |e| future e, &future_factory })
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4