Edge Features are under active development and may change frequently.
concurrent-ruby-edge
are expected to move to concurrent-ruby
when finalised.This Actor model implementation makes actors very cheap to create and discard. Thousands of actors can be created, allowing you to break the program into smaller maintainable pieces, without violating the single responsibility principle.
What is an actor model?Actor-based concurrency is all the rage in some circles. Originally described in 1973, the actor model is a paradigm for creating asynchronous, concurrent objects that is becoming increasingly popular. Much has changed since actors were first written about four decades ago, which has led to a serious fragmentation within the actor community. There is no universally accepted, strict definition of "actor" and actor implementations differ widely between languages and libraries.
Wiki definition is pretty good: The actor model in computer science is a mathematical model of concurrent computation that treats actors as the universal primitives of concurrent digital computation: in response to a message that it receives, an actor can make local decisions, create more actors, send more messages, and determine how to respond to the next message received.
Why?Concurrency is hard to get right, actors are one of many ways how to simplify the problem.
Quick exampleAn example:
class Counter < Concurrent::Actor::Context
def initialize(initial_value)
@count = initial_value
end
def on_message(message)
if Integer === message
@count += message
end
end
end
counter = Counter.spawn(:first, 5)
counter.tell(1)
counter << 1
counter.ask(0).class
counter.ask(0).value
class Adder < Concurrent::Actor::RestartingContext
def initialize(init)
@count = init
end
def on_message(message)
case message
when :add
@count += 1
else
pass
end
end
end
adder = Adder.spawn(name: :adder, link: true, args: [1])
adder.parent
adder.tell(:add).tell(:add)
adder.ask!(:add) adder.ask!(:bad) rescue $!
adder.ask!(:add) adder.ask!(:terminate!)
Spawning actors
Sending messages
self
(the reference) allowing to chain message telling.Messages are processed in same order as they are sent by a sender. It may interleaved with messages from other senders though.
ImmutabilityMessages sent between actors should be immutable. Gems like
are very helpful.
require 'algebrick'
Protocol = Algebrick.type do
variants Add = type { fields! a: Numeric, b: Numeric },
Subtract = type { fields! a: Numeric, b: Numeric }
end
class Calculator < Concurrent::Actor::RestartingContext
include Algebrick::Matching
def on_message(message)
match message,
(on Add.(~any, ~any) do |a, b|
a + b
end),
(on ~Subtract do |(a, b)|
a - b
end)
end
end
calculator = Calculator.spawn('calculator')
addition = calculator.ask Add[1, 2]
subtraction = calculator.ask Subtract[1, 0.5]
results = (addition & subtraction)
results.value!
calculator.ask! :terminate!
Actor definition
New actor is defined by subclassing RestartingContext, Context and defining its abstract methods. AbstractContext can be subclassed directly to implement more specific behaviour see Root implementation.
Basic Context of an Actor. It supports only linking and it simply terminates on error. Uses Behaviour.basic_behaviour_definition:
Context of an Actor for robust systems. It supports supervision, linking, pauses on error. Uses Behaviour.restarting_behaviour_definition
Example of ac actor definition:
Message = Struct.new :action, :value
class AnActor < Concurrent::Actor::RestartingContext
def initialize(init)
@counter = init
end
def on_message(message)
case message.action
when :add
@counter = @counter + message.value
when :subtract
@counter = @counter - message.value
when :value
@counter
else
pass
end
end
def on_event(event)
if event == :reset
@counter = 0 end
end
end
an_actor = AnActor.spawn name: 'an_actor', args: 10
an_actor << Message.new(:add, 1) << Message.new(:subtract, 2)
an_actor.ask!(Message.new(:value, nil)) an_actor << :boo << Message.new(:add, 1)
an_actor.ask!(Message.new(:value, nil)) an_actor << :terminate!
See methods of AbstractContext what else can be tweaked, e.g AbstractContext#default_reference_class
ReferenceReference is public interface of Actor instances. It is used for sending messages and can be freely passed around the application. It also provides some basic information about the actor, see PublicDelegations.
AdHoc.spawn('printer') { -> message { puts message } }
Garbage collection
Spawned actor cannot be garbage-collected until it's terminated. There is a reference held in the parent actor.
Parent-child relationship, name, and pathparent.path + self.name
up to a Actor.root, e.g. /an_actor/its_child
.Actors have modular architecture, which is achieved by combining a light core with chain of behaviours. Each message or internal event propagates through the chain allowing the behaviours react based on their responsibility.
Links the actor to other actors and sends actor's events to them, like:
:terminated
,:paused
,:resumed
, errors, etc. Linked actor needs to handle those messages.listener = AdHoc.spawn name: :listener do lambda do |message| case message when Reference if message.ask!(:linked?) message << :unlink else message << :link end else puts "got event #{message.inspect} from #{envelope.sender}" end end end an_actor = AdHoc.spawn name: :an_actor, supervise: true, behaviour_definition: Behaviour.restarting_behaviour_definition do lambda { |message| raise 'failed'} end listener.ask(an_actor).wait an_actor.ask(:fail).wait listener.ask(an_actor).wait an_actor.ask(:fail).wait an_actor << :terminate!
produces only two events, other events happened after unlinking
got event #<RuntimeError: failed> from #<Concurrent::Actor::Reference /an_actor (Concurrent::Actor::Utils::AdHoc)> got event :reset from #<Concurrent::Actor::Reference /an_actor (Concurrent::Actor::Utils::AdHoc)>
Accepts
:await
messages. Which allows to wait on Actor to process all previously send messages.actor << :a << :b actor.ask(:await).wait
Allows to pause actors on errors. When paused all arriving messages are collected and processed after the actor is resumed or reset. Resume will simply continue with next message. Reset also reinitialized context.
Handles supervised actors. Handle configures what to do with failed child: :terminate!, :resume!, :reset!, or :restart!. Strategy sets :one_for_one (restarts just failed actor) or :one_for_all (restarts all child actors).
Handles supervised actors. Handle configures what to do with failed child: :terminate!, :resume!, :reset!, or :restart!. Strategy sets :one_for_one (restarts just failed actor) or :one_for_all (restarts all child actors).
Delegates messages and events to AbstractContext instance.
Behaviour::ErrorsOnUnknownMessage:
Simply fails when message arrives here. It's usually the last behaviour.
Handles actor termination. Waits until all its children are terminated, can be configured on behaviour initialization.
Removes terminated children.
If needed new behaviours can be added, or old one removed to get required behaviour.
Context uses Array of behaviours and their construction parameters.
[[Behaviour::SetResults, :terminate!],
[Behaviour::RemovesChild],
[Behaviour::Termination],
[Behaviour::Linking],
[Behaviour::Awaits],
[Behaviour::ExecutesContext],
[Behaviour::ErrorsOnUnknownMessage]]
RestartingContext uses Array of behaviours and their construction parameters.
[[Behaviour::SetResults, :pause!],
[Behaviour::RemovesChild],
[Behaviour::Termination],
[Behaviour::Linking],
[Behaviour::Pausing],
[Behaviour::Supervising, :reset!, :one_for_one],
[Behaviour::Awaits],
[Behaviour::ExecutesContext],
[Behaviour::ErrorsOnUnknownMessage]]
Actors are running on shared thread poll which allows user to create many actors cheaply. Downside is that these actors cannot be directly used to do IO or other blocking operations. Blocking operations could starve the global_fast_executor
. However there are two options:
global_io_executor
(which is intended for blocking operations) sending results back to self in messages.global_io_executor
instead of global_fast_executor
, e.g. AnIOActor.spawn name: :blocking, executor: Concurrent.global_io_executor
.require 'concurrent'
class ActorDoingIO < Concurrent::Actor::RestartingContext
def on_message(message)
end
def default_executor
Concurrent.global_io_executor
end
end
actor_doing_io = ActorDoingIO.spawn :actor_doing_io
actor_doing_io.executor == Concurrent.global_io_executor
class IOWorker < Concurrent::Actor::Context
def on_message(io_job)
sleep 0.1
puts "#{path} second:#{(Time.now.to_f*100).floor} message:#{io_job}"
end
def default_executor
Concurrent.global_io_executor
end
end
pool = Concurrent::Actor::Utils::Pool.spawn('pool', 2) do |index|
IOWorker.spawn(name: "worker-#{index}")
end
pool << 1 << 2 << 3 << 4 << 5 << 6
sleep 1
Dead letter routing
see AbstractContext#dead_letter_routing description:
FAQ What happens if I try to supervise using a normal Context?Defines an actor responsible for dead letters. Any rejected message send with Reference#tell is sent there, a message with future is considered already monitored for failures. Default behaviour is to use AbstractContext#dead_letter_routing of the parent, so if no AbstractContext#dead_letter_routing method is overridden in parent-chain the message ends up in
Actor.root.dead_letter_routing
agent which will log warning.
Alleged supervisor will receive errors from its supervised actors. They'll have to be handled manually.
How to change supervision strategy?Use option behaviour_definition: Behaviour.restarting_behaviour_definition(:resume!)
or behaviour_definition: Behaviour.restarting_behaviour_definition(:reset!, :one_for_all)
Any existing behavior can be subclassed
How to implement custom restarting?By subclassing Behaviour::Pausing and overriding Behaviour::Pausing#restart!. Implementing AbstractContext#on_event could be also considered.
We'll be happy to answer any other questions, just open an Issue or find us on https://gitter.im/ruby-concurrency/concurrent-ruby.
SpeedSimple benchmark Actor vs Celluloid, the numbers are looking good but you know how it is with benchmarks. Source code is in examples/actor/celluloid_benchmark.rb
. It sends numbers between x actors and adding 1 until certain limit is reached.
Benchmark legend:
Rehearsal ---------------------------------------------------------
50000 2 concurrent 26.140000 0.610000 26.750000 ( 7.761000)
50000 2 celluloid 41.440000 5.270000 46.710000 ( 17.535000)
50000 500 concurrent 11.340000 0.180000 11.520000 ( 3.498000)
50000 500 celluloid 19.310000 10.680000 29.990000 ( 14.619000)
50000 1000 concurrent 10.640000 0.180000 10.820000 ( 3.563000)
50000 1000 celluloid 17.840000 19.850000 37.690000 ( 18.892000)
50000 1500 concurrent 14.120000 0.290000 14.410000 ( 4.618000)
50000 1500 celluloid 19.060000 28.920000 47.980000 ( 25.185000)
---------------------------------------------- total: 225.870000sec
mes. act. impl. user system total real
50000 2 concurrent 7.320000 0.530000 7.850000 ( 3.637000)
50000 2 celluloid 13.780000 4.730000 18.510000 ( 10.756000)
50000 500 concurrent 9.270000 0.140000 9.410000 ( 3.020000)
50000 500 celluloid 16.540000 10.920000 27.460000 ( 14.308000)
50000 1000 concurrent 9.970000 0.160000 10.130000 ( 3.445000)
50000 1000 celluloid 15.930000 20.840000 36.770000 ( 18.272000)
50000 1500 concurrent 11.580000 0.240000 11.820000 ( 3.723000)
50000 1500 celluloid 19.440000 29.060000 48.500000 ( 25.227000) (1)
MRI 2.1.0
Rehearsal ---------------------------------------------------------
50000 2 concurrent 4.180000 0.080000 4.260000 ( 4.269435)
50000 2 celluloid 7.740000 3.100000 10.840000 ( 10.043875)
50000 500 concurrent 5.900000 1.310000 7.210000 ( 6.565067)
50000 500 celluloid 12.820000 5.810000 18.630000 ( 17.320765)
50000 1000 concurrent 6.080000 1.640000 7.720000 ( 6.931294)
50000 1000 celluloid 17.130000 8.320000 25.450000 ( 23.786146)
50000 1500 concurrent 6.940000 2.030000 8.970000 ( 7.927330)
50000 1500 celluloid 20.980000 12.040000 33.020000 ( 30.849578)
---------------------------------------------- total: 116.100000sec
mes. act. impl. user system total real
50000 2 concurrent 3.730000 0.100000 3.830000 ( 3.822688)
50000 2 celluloid 7.900000 2.910000 10.810000 ( 9.924014)
50000 500 concurrent 5.420000 1.230000 6.650000 ( 6.025579)
50000 500 celluloid 12.720000 5.540000 18.260000 ( 16.889517)
50000 1000 concurrent 5.420000 0.910000 6.330000 ( 5.896689)
50000 1000 celluloid 16.090000 8.040000 24.130000 ( 22.347102)
50000 1500 concurrent 5.580000 0.760000 6.340000 ( 6.038535)
50000 1500 celluloid 20.000000 11.680000 31.680000 ( 29.590774) (1)
Note (1): Celluloid is using thread per actor so this bench is creating about 1500 native threads. Actor is using constant number of threads.
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4