The MongoDB Scala Driver is an asynchronous and non blocking driver. Using the Observable
model asynchronous events become simple, composable operations, freed from the complexity of nested callbacks.
For asynchronous operations there are three interfaces Observable
, Subscription
and Observer
.
The interfaces are similar to Publisher
, Subscription
and Subscriber
interfaces from the reactive streams JVM implementation. However, we prefer the name Observerable
to Publisher
and Observer
to Subscriber
for readability purposes.
The Observable
represents a MongoDB operation which emits its results to the Observer
based on demand requested by the Subscription
to the Observable
.
Observables can be thought of as partial functions and like partial functions nothing happens until they are called. An Observable
can be subscribed to multiple times, with each subscription potentially causing new side effects eg: querying MongoDB or inserting data.
Introduced in 2.0.0 the SingleObservable
trait is an Observable
implementation that will only return a single item. It can be used in the same way as ordinary Observables
.
A Subscription
represents a one-to-one lifecycle of an Observer
subscribing to an Observable
. A Subscription
to an Observable
can only be used by a single Observer
. The purpose of a Subscription
is to control demand and to allow unsubscribing from the Observable
.
An Observer
provides the mechanism for receiving push-based notifications from the Observable
. Demand for these events is signalled by its Subscription
.
On subscription to an Observable[TResult]
the Observer
will be passed the Subscription
via the onSubscribe(subscription: Subscription)
. Demand for results is signaled via the Subscription
and any results are passed to the onNext(result: TResult)
method. If there is an error for any reason the onError(e: Throwable)
will be called and no more events passed to the Observer
. Alternatively, when the Observer
has consumed all the results from the Observable
the onComplete()
method will be called.
In the following example, the Subscription
is used to control demand when iterating an Observable
. The default Observer
implementation automatically requests all the data. Below we override the onSubscribe
method custom so we can manage the demand driven iteration of the Observable
:
collection.find().subscribe(new Observer[Document](){
var batchSize: Long = 10
var seen: Long = 0
var subscription: Option[Subscription] = None
override def onSubscribe(subscription: Subscription): Unit = {
this.subscription = Some(subscription)
subscription.request(batchSize)
}
override def onNext(result: Document): Unit = {
println(document.toJson())
seen += 1
if (seen == batchSize) {
seen = 0
subscription.get.request(batchSize)
}
}
override def onError(e: Throwable): Unit = println(s"Error: $e")
override def onComplete(): Unit = println("Completed")
})
Observable Helpers
The org.mongodb.scala
package provides improved interaction with the Java Observable
class via the ScalaObservable
implicit class. The extended functionality includes simple subscription via anonymous functions:
// Subscribe with custom onNext:
collection.find().subscribe((doc: Document) => println(doc.toJson()))
// Subscribe with custom onNext and onError
collection.find().subscribe((doc: Document) => println(doc.toJson()),
(e: Throwable) => println(s"There was an error: $e"))
// Subscribe with custom onNext, onError and onComplete
collection.find().subscribe((doc: Document) => println(doc.toJson()),
(e: Throwable) => println(s"There was an error: $e"),
() => println("Completed!"))
The ScalaObservable
implicit class also provides the following Monadic operators to make chaining and working with Observable
instances simpler:
GenerateHtmlObservable().andThen({
case Success(html: String) => renderHtml(html)
case Failure(t) => renderHttp500
})
The full list of Monadic operators available are:
andThen
: Allows the chaining of Observables.collect
: Collects all the results into a sequence.fallbackTo
: Allows falling back to an alternative Observable
if there is a failurefilter
: Filters results of the Observable
.flatMap
: Create a new Observable
by applying a function to each result of the Observable
.foldLeft
: Creates a new Observable that contains the single result of the applied accumulator function.foreach
: Applies a function applied to each emitted result.head
: Returns the head of the Observable
in a Future
.map
: Creates a new Observable by applying a function to each emitted result of the Observable.observeOn
: Creates a new Observable that uses a specific ExecutionContext
for future operations.recover
: Creates a new Observable
that will handle any matching throwable that this Observable
might contain by assigning it a value of another Observable
.recoverWith
: Creates a new Observable that will handle any matching throwable that this Observable might contain.toFuture
: Collects the Observable
results and converts to a Future
.transform
: Creates a new Observable
by applying the resultFunction function to each emitted result.withFilter
: Provides for-comprehensions support to Observables.zip
: Zips the values of this and that Observable
, and creates a new Observable
holding the tuple of their results.As we know that a SingleObservable[T]
will only return a single item the toFuture()
method will return a Future[T]
in the same way as the head
method does. There is also an implicit converter that converts an Observable
to a SingleObservable
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4