A RetroSearch Logo

Home - News ( United States | United Kingdom | Italy | Germany ) - Football scores

Search Query:

Showing content from http://mongodb.github.io/mongo-scala-driver/2.1/reference/observables/ below:

Observables

Observables

The MongoDB Scala Driver is an asynchronous and non blocking driver. Using the Observable model asynchronous events become simple, composable operations, freed from the complexity of nested callbacks.

For asynchronous operations there are three interfaces Observable, Subscription and Observer.

Note

The interfaces are similar to Publisher, Subscription and Subscriber interfaces from the reactive streams JVM implementation. However, we prefer the name Observerable to Publisher and Observer to Subscriber for readability purposes.

Observable

The Observable represents a MongoDB operation which emits its results to the Observer based on demand requested by the Subscription to the Observable.

important

Observables can be thought of as partial functions and like partial functions nothing happens until they are called. An Observable can be subscribed to multiple times, with each subscription potentially causing new side effects eg: querying MongoDB or inserting data.

SingleObservable

Introduced in 2.0.0 the SingleObservable trait is an Observable implementation that will only return a single item. It can be used in the same way as ordinary Observables.

Subscription

A Subscription represents a one-to-one lifecycle of an Observer subscribing to an Observable. A Subscription to an Observable can only be used by a single Observer. The purpose of a Subscription is to control demand and to allow unsubscribing from the Observable.

Observer

An Observer provides the mechanism for receiving push-based notifications from the Observable. Demand for these events is signalled by its Subscription.

On subscription to an Observable[TResult] the Observer will be passed the Subscription via the onSubscribe(subscription: Subscription). Demand for results is signaled via the Subscription and any results are passed to the onNext(result: TResult) method. If there is an error for any reason the onError(e: Throwable) will be called and no more events passed to the Observer. Alternatively, when the Observer has consumed all the results from the Observable the onComplete() method will be called.

Back Pressure

In the following example, the Subscription is used to control demand when iterating an Observable. The default Observer implementation automatically requests all the data. Below we override the onSubscribe method custom so we can manage the demand driven iteration of the Observable:

collection.find().subscribe(new Observer[Document](){

  var batchSize: Long = 10
  var seen: Long = 0
  var subscription: Option[Subscription] = None
  
  override def onSubscribe(subscription: Subscription): Unit = {
    this.subscription = Some(subscription)
    subscription.request(batchSize)
  }
  
  override def onNext(result: Document): Unit = {
    println(document.toJson())
    seen += 1
    if (seen == batchSize) {
      seen = 0
      subscription.get.request(batchSize)
    }
  }

  override def onError(e: Throwable): Unit = println(s"Error: $e")

  override def onComplete(): Unit = println("Completed")
})
Observable Helpers

The org.mongodb.scala package provides improved interaction with the Java Observable class via the ScalaObservable implicit class. The extended functionality includes simple subscription via anonymous functions:

// Subscribe with custom onNext:
collection.find().subscribe((doc: Document) => println(doc.toJson()))

// Subscribe with custom onNext and onError
collection.find().subscribe((doc: Document) => println(doc.toJson()),
                            (e: Throwable) => println(s"There was an error: $e"))

// Subscribe with custom onNext, onError and onComplete
collection.find().subscribe((doc: Document) => println(doc.toJson()),
                            (e: Throwable) => println(s"There was an error: $e"),
                            () => println("Completed!"))

The ScalaObservable implicit class also provides the following Monadic operators to make chaining and working with Observable instances simpler:

GenerateHtmlObservable().andThen({
  case Success(html: String) => renderHtml(html)
  case Failure(t) => renderHttp500
})

The full list of Monadic operators available are:

SingleObservable

As we know that a SingleObservable[T] will only return a single item the toFuture() method will return a Future[T] in the same way as the head method does. There is also an implicit converter that converts an Observable to a SingleObservable


RetroSearch is an open source project built by @garambo | Open a GitHub Issue

Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo

HTML: 3.2 | Encoding: UTF-8 | Version: 0.7.4