A
FlowableProcessor
implementation that coordinates downstream requests through a front-buffer and stable-prefetching, optionally canceling the upstream if all subscribers have cancelled.
This processor does not have a public constructor by design; a new empty instance of this MulticastProcessor
can be created via the following create
methods that allow configuring it:
create()
: create an empty MulticastProcessor
with Flowable.bufferSize()
prefetch amount and no reference counting behavior.create(int)
: create an empty MulticastProcessor
with the given prefetch amount and no reference counting behavior.create(boolean)
: create an empty MulticastProcessor
with Flowable.bufferSize()
prefetch amount and an optional reference counting behavior.create(int, boolean)
: create an empty MulticastProcessor
with the given prefetch amount and an optional reference counting behavior.When the reference counting behavior is enabled, the MulticastProcessor
cancels its upstream when all Subscriber
s have cancelled. Late Subscriber
s will then be immediately completed.
Because MulticastProcessor
implements the Subscriber
interface, calling onSubscribe
is mandatory (Rule 2.12). If MulticastProcessor
should run standalone, i.e., without subscribing the MulticastProcessor
to another Publisher
, use start()
or startUnbounded()
methods to initialize the internal buffer. Failing to do so will lead to a NullPointerException
at runtime.
Use offer(Object)
to try and offer/emit items but don't fail if the internal buffer is full.
A MulticastProcessor
is a Processor
type in the Reactive Streams specification, null
s are not allowed (Rule 2.13) as parameters to onSubscribe(Subscription)
, offer(Object)
, onNext(Object)
and onError(Throwable)
. Such calls will result in a NullPointerException
being thrown and the processor's state is not changed.
Since a MulticastProcessor
is a Flowable
, it supports backpressure. The backpressure from the currently subscribed Subscriber
s are coordinated by emitting upstream items only if all of those Subscriber
s have requested at least one item. This behavior is also called lockstep-mode because even if some Subscriber
s can take any number of items, other Subscriber
s requesting less or infrequently will slow down the overall throughput of the flow.
Calling onNext(Object)
, offer(Object)
, onError(Throwable)
and onComplete()
is required to be serialized (called from the same thread or called non-overlappingly from different threads through external means of serialization). The FlowableProcessor.toSerialized()
method available to all FlowableProcessor
s provides such serialization and also protects against reentrance (i.e., when a downstream Subscriber
consuming this processor also wants to call onNext(Object)
on this processor recursively).
This MulticastProcessor
supports the standard state-peeking methods hasComplete()
, hasThrowable()
, getThrowable()
and hasSubscribers()
. This processor doesn't allow peeking into its buffer.
When this MulticastProcessor
is terminated via onError(Throwable)
or onComplete()
, all previously signaled but not yet consumed items will be still available to Subscriber
s and the respective terminal even is only emitted when all previous items have been successfully delivered to Subscriber
s. If there are no Subscriber
s, the remaining items will be buffered indefinitely.
The MulticastProcessor
does not support clearing its cached events (to appear empty again).
Subscriber
s are coordinated by emitting upstream items only if all of those Subscriber
s have requested at least one item. This behavior is also called lockstep-mode because even if some Subscriber
s can take any number of items, other Subscriber
s requesting less or infrequently will slow down the overall throughput of the flow.
MulticastProcessor
does not operate by default on a particular Scheduler
and the Subscriber
s get notified on an arbitrary thread in a serialized fashion.
Example:
MulticastProcessor<Integer> mp = Flowable.range(1, 10)
.subscribeWith(MulticastProcessor.create());
mp.test().assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// --------------------
MulticastProcessor<Integer> mp2 = MulticastProcessor.create(4);
mp2.start();
assertTrue(mp2.offer(1));
assertTrue(mp2.offer(2));
assertTrue(mp2.offer(3));
assertTrue(mp2.offer(4));
assertFalse(mp2.offer(5));
mp2.onComplete();
mp2.test().assertResult(1, 2, 3, 4);
History: 2.1.14 - experimental
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4