A RetroSearch Logo

Home - News ( United States | United Kingdom | Italy | Germany ) - Football scores

Search Query:

Showing content from http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Throttle.html below:

Class: Concurrent::Throttle — Concurrent Ruby

Note:

Edge Features are under active development and may change frequently.

A tool managing concurrency level of tasks. The maximum capacity is set in constructor. Each acquire will lower the available capacity and release will increase it. When there is no available capacity the current thread may either be blocked or an event is returned which will be resolved when capacity becomes available.

The more common usage of the Throttle is with a proxy executor a_throttle.on(Concurrent.global_io_executor). Anything executed on the proxy executor will be throttled and execute on the given executor. There can be more than one proxy executors. All abstractions which execute tasks have option to specify executor, therefore the proxy executor can be injected to any abstraction throttling its concurrency level.

Examples

Limiting concurrency level of a concurrently executed block to two

max_two = Concurrent::Throttle.new 2

concurrency_level = Concurrent::AtomicFixnum.new
job = -> do
    concurrency_level.increment
    do_stuff
    current_concurrency_level = concurrency_level.value
    concurrency_level.decrement
    current_concurrency_level
end 

Array.new(10) do  
  Thread.new do
    max_two.acquire(&job)   
  end
end.map(&:value)                         

Notice that the returned array has no number bigger than 2 therefore the concurrency level of the block with the do_stuff was never bigger than 2.

def monitor_concurrency_level(concurrency_level, &block)
  concurrency_level.increment
  block.call
  current_concurrency_level = concurrency_level.value
  concurrency_level.decrement
    return current_concurrency_level 
end 

throttle = Concurrent::Throttle.new 3
concurrency_level = Concurrent::AtomicFixnum.new 

Array.new(10) do |i|
    throttle.future(i) do |arg|
    monitor_concurrency_level(concurrency_level) { do_stuff arg }  
      end
end.map(&:value!)                        

The concurrency level does not rise above 3.

It works by setting the executor of the future created from the throttle. The executor is a proxy executor for the Concurrent::Promises.default_executor which can be obtained using #on method. Therefore the above example could be instead more explicitly written as follows

Array.new(10) do |i|
    Concurrent::Promises.future_on(throttle.on(Concurrent::Promises.default_executor)) do
      end
end.map(&:value!) 

Anything executed on the proxy executor is throttled. A throttle can have more proxy executors for different executors, all jobs share the same capacity provided by the throttle.

Since the proxy executor becomes the executor of the future, any chained futures will also be throttled. It can be changed by using different executor. It the following example the first 2 futures in the chain are throttled, the last is not.

concurrency_level_throttled   = Concurrent::AtomicFixnum.new 
concurrency_level_unthrottled = Concurrent::AtomicFixnum.new 
Array.new(10) do |i|
  throttle.future(i) do 
    monitor_concurrency_level(concurrency_level_throttled) { do_stuff } 
  end.then do |v|
    [v, monitor_concurrency_level(concurrency_level_throttled) { do_stuff }]
  end.then_on(:io) do |l1, l2|
    [l1, l2, monitor_concurrency_level(concurrency_level_unthrottled) { 5.times { do_stuff } }]
  end
end.map(&:value!) 

In the output you can see that the first 2 columns do not cross the 3 capacity limit and the last column which is untroubled does.

TODO (pitr-ch 20-Dec-2018): example with virtual throttled executor, throttling only part of promises chain.

Other abstraction

The proxy executor created with throttle can be used with other abstractions as well and combined.

concurrency_level = Concurrent::AtomicFixnum.new 
futures = Array.new(5) do |i|
    throttle.future(i) do |arg|
    monitor_concurrency_level(concurrency_level) { do_stuff arg }  
      end
end 
agents = Array.new(5) do |i|
  agent = Concurrent::Agent.new 0
    agent.send_via(throttle.on(:io)) { monitor_concurrency_level(concurrency_level_throttled) { do_stuff } }
  agent 
end 
futures.map(&:value!)                    agents.each { |a| a.await }.map(&:value) 

There is no observed concurrency level above 3.

Constructor Details #initialize(capacity) ⇒ Throttle
37
38
39
40
41
42
43
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 37

def initialize(capacity)
  super()
  @MaxCapacity            = capacity
  @Queue                  = LockFreeQueue.new
  @executor_cache         = [nil, nil]
  self.capacity = capacity
end
Instance Method Details #acquire(timeout = nil) { ... } ⇒ Object, self, true, false

Blocks current thread until there is capacity available in the throttle. The acquired capacity has to be returned to the throttle by calling #release. If block is passed then the block is called after the capacity is acquired and it is automatically released after the block is executed.

63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 63

def acquire(timeout = nil, &block)
  event = acquire_or_event
  if event
    within_timeout = event.wait(timeout)
        event.on_resolution!(self, &:release) unless within_timeout
  else
    within_timeout = true
  end

  called = false
  if timeout
    if block
      if within_timeout
        called = true
        block.call
      else
        nil
      end
    else
      within_timeout
    end
  else
    if block
      called = true
      block.call
    else
      self
    end
  end
ensure
  release if called
end
#available_capacity ⇒ Integer

Returns The available capacity.

30
31
32
33
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 30

def available_capacity
  current_capacity = capacity
  current_capacity >= 0 ? current_capacity : 0
end
#default_executorExecutorService
183
184
185
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 183

def default_executor
  on(super)
end
#max_capacity ⇒ Integer

Returns The maximum capacity.

46
47
48
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 46

def max_capacity
  @MaxCapacity
end
#on(executor = Promises::FactoryMethods.default_executor) ⇒ ExecutorService

Returns An executor which wraps given executor and allows to post tasks only as available capacity in the throttle allows.

162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 162

def on(executor = Promises::FactoryMethods.default_executor)
  current_executor, current_cache = @executor_cache
  return current_cache if current_executor == executor && current_cache

  if current_executor.nil?
        proxy_executor  = ProxyExecutor.new(self, Concurrent.executor(executor))
    @executor_cache = [executor, proxy_executor]
    return proxy_executor
  else
        ProxyExecutor.new(self, Concurrent.executor(executor))
  end
end
#release ⇒ self

Releases previously acquired capacity back to Throttle. Has to be called exactly once for each acquired capacity.

118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 118

def release
  while true
    current_capacity = capacity
    if compare_and_set_capacity current_capacity, current_capacity + 1
      if current_capacity < 0
                Thread.pass until (trigger = @Queue.pop)
        trigger.resolve
      end
      return self
    end
  end
end
#to_s ⇒ String Also known as: inspect

Returns Short string representation.

133
134
135
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 133

def to_s
  format '%s capacity available %d of %d>', super[0..-2], capacity, @MaxCapacity
end
#try_acquire ⇒ true, false

Tries to acquire capacity from the throttle. Returns true when there is capacity available. The acquired capacity has to be returned to the throttle by calling #release.

102
103
104
105
106
107
108
109
110
111
112
# File 'lib/concurrent-ruby-edge/concurrent/edge/throttle.rb', line 102

def try_acquire
  while true
    current_capacity = capacity
    if current_capacity > 0
      return true if compare_and_set_capacity(
          current_capacity, current_capacity - 1)
    else
      return false
    end
  end
end
#any_event_on(default_executor, *futures_and_or_events) ⇒ Event Originally defined in module Promises::FactoryMethods

Creates a new event which becomes resolved after the first futures_and_or_events resolves. If resolved it does not propagate AbstractEventFuture#touch, leaving delayed futures un-executed if they are not required any more.

#any_fulfilled_future_on(default_executor, *futures_and_or_events) ⇒ Future Originally defined in module Promises::FactoryMethods

Creates a new future which is resolved after the first futures_and_or_events is fulfilled. Its result equals the result of the first resolved future or if all futures_and_or_events reject, it has reason of the last rejected future. If resolved it does not propagate AbstractEventFuture#touch, leaving delayed futures un-executed if they are not required any more. If event is supplied, which does not have value and can be only resolved, it's represented as :fulfilled with value nil.

#any_resolved_future_on(default_executor, *futures_and_or_events) ⇒ Future Originally defined in module Promises::FactoryMethods

Creates a new future which is resolved after the first futures_and_or_events is resolved. Its result equals the result of the first resolved future. If resolved it does not propagate AbstractEventFuture#touch, leaving delayed futures un-executed if they are not required any more. If event is supplied, which does not have value and can be only resolved, it's represented as :fulfilled with value nil.

#delay_on(default_executor, *args) {|*args| ... } ⇒ Future #delay_on(default_executor) ⇒ Event Originally defined in module Promises::FactoryMethods

Creates a new event or future which is resolved only after it is touched, see AbstractEventFuture#touch.

#fulfilled_future(value, default_executor = self.default_executor) ⇒ Future Originally defined in module Promises::FactoryMethods

Creates a resolved future which will be fulfilled with the given value.

#future_on(default_executor, *args) {|*args| ... } ⇒ Future Originally defined in module Promises::FactoryMethods

Constructs a new Future which will be resolved after block is evaluated on default executor. Evaluation begins immediately.

#make_future(nil, default_executor = self.default_executor) ⇒ Event #make_future(a_future, default_executor = self.default_executor) ⇒ Future #make_future(an_event, default_executor = self.default_executor) ⇒ Event #make_future(exception, default_executor = self.default_executor) ⇒ Future #make_future(value, default_executor = self.default_executor) ⇒ Future Originally defined in module Promises::FactoryMethods

General constructor. Behaves differently based on the argument's type. It's provided for convenience but it's better to be explicit.

#rejected_future(reason, default_executor = self.default_executor) ⇒ Future Originally defined in module Promises::FactoryMethods

Creates a resolved future which will be rejected with the given reason.

#resolved_future(fulfilled, value, reason, default_executor = self.default_executor) ⇒ Future Originally defined in module Promises::FactoryMethods

Creates a resolved future with will be either fulfilled with the given value or rejected with the given reason.

#schedule_on(default_executor, intended_time, *args) {|*args| ... } ⇒ Future #schedule_on(default_executor, intended_time) ⇒ Event Originally defined in module Promises::FactoryMethods

Creates a new event or future which is resolved in intended_time.

#zip_events_on(default_executor, *futures_and_or_events) ⇒ Event Originally defined in module Promises::FactoryMethods

Creates a new event which is resolved after all futures_and_or_events are resolved. (Future is resolved when fulfilled or rejected.)

#zip_futures_on(default_executor, *futures_and_or_events) ⇒ Future Originally defined in module Promises::FactoryMethods

Creates a new future which is resolved after all futures_and_or_events are resolved. Its value is an array of zipped future values. Its reason is an array of reasons for rejection. If there is an error it rejects. If event is supplied, which does not have value and can be only resolved, it's represented as :fulfilled with value nil.

#zip_futures_over(enumerable, &future_factory) ⇒ Future Originally defined in module Promises::FactoryMethods Note:

Edge Features are under active development and may change frequently.

Shortcut of #zip_futures_over_on with default :io executor supplied.

#zip_futures_over_on(default_executor, enumerable) {|element| ... } ⇒ Future Originally defined in module Promises::FactoryMethods Note:

Edge Features are under active development and may change frequently.

Creates new future which is resolved after all the futures created by future_factory from enumerable elements are resolved. Simplified it does: zip(*enumerable.map { |e| future e, &future_factory })


RetroSearch is an open source project built by @garambo | Open a GitHub Issue

Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo

HTML: 3.2 | Encoding: UTF-8 | Version: 0.7.4