In this guide, you can learn how to use the Kotlin Sync driver to monitor a change stream, allowing you to view real-time changes to your database. A change stream is a MongoDB Server feature that publishes data changes on a collection, database, or deployment. Your application can subscribe to a change stream and use events to perform other actions.
The examples in this guide use the restaurants
collection in the sample_restaurants
database from the Atlas sample datasets. To learn how to create a free MongoDB Atlas cluster and load the sample datasets, see the Get Started with Atlas guide.
The following Kotlin data class models the documents in this collection:
data class Restaurant( val name: String, val cuisine: String,)
To open a change stream, call the watch()
method. The instance on which you call the watch()
method on determines the scope of events that the change stream listens for. You can call the watch()
method on instances of the following classes:
MongoClient
: To monitor all changes in the MongoDB deployment
MongoDatabase
: To monitor changes in all collections in the database
MongoCollection
: To monitor changes in the collection
The following example opens a change stream on the restaurants
collection and prints changes as they occur:
collection.watch().forEach { change -> println(change)}
To begin watching for changes, run the application. Then, in a separate application or shell, perform a write operation on the restaurants
collection. The following example updates a document in which the value of the name
is "Blarney Castle"
:
val filter = Filters.eq(Restaurant::name.name, "Blarney Castle")val update = Updates.set(Restaurant::cuisine.name, "Irish")val result = collection.updateOne(filter, update)
When you update the collection, the change stream application prints the change as it occurs. The printed change event resembles the following:
{ "_id": { ... }, "operationType": "update", "clusterTime": { ... }, "ns": { "db": "sample_restaurants", "coll": "restaurants" }, "updateDescription": { "updatedFields": { "cuisine": "Irish" }, "removedFields": [], "truncatedArrays": [] } ...}
You can pass the pipeline
parameter to the watch()
method to modify the change stream output. This parameter allows you to watch for only specified change events. Format the parameter as a list of objects that each represents an aggregation stage.
You can specify the following stages in the pipeline
parameter:
$addFields
$match
$project
$replaceRoot
$replaceWith
$redact
$set
$unset
The following example uses the pipeline
parameter that includes a $match
to open a change stream that only records update operations:
val pipeline = listOf( Aggregates.match(Filters.eq("operationType", "update")))collection.watch(pipeline).forEach { change -> println(change)}
To learn more about modifying your change stream output, see the Modify Change Stream Output section in the MongoDB Server manual.
You can modify the watch()
by chaining methods to the ChangeStreamIterable
object returned by the watch()
method call. If you don't specify any options, the driver does not customize the operation.
The following table describes methods you can use to customize the behavior of watch()
:
Method
Description
batchSize()
Sets the number of documents to return per batch.
collation()
Specifies the kind of language collation to use when sorting results. For more information, see
Collationin the MongoDB Server manual.
comment()
Specifies a comment to attach to the operation.
fullDocument()
Sets the
fullDocument
value. To learn more, see the
Include Pre-Images and Post-Imagessection of this document.
fullDocumentBeforeChange()
Sets the
fullDocumentBeforeChange
value. To learn more, see the
Include Pre-Images and Post-Imagessection of this document.
maxAwaitTime()
Sets the maximum await execution time on the server for this operation, in milliseconds.
For a complete list of methods you can use to configure the watch()
method, see the ChangeStreamIterable API documentation.
You can enable pre-images and post-images on collections only if your deployment uses MongoDB v6.0 or later.
By default, when you perform an operation on a collection, the corresponding change event includes only the delta of the fields modified by that operation. To see the full document before or after a change, chain the fullDocumentBeforeChange()
or the fullDocument()
methods to the watch()
method.
The pre-image is the full version of a document before a change. To include the pre-image in the change stream event, pass one of the following options to the fullDocumentBeforeChange()
method:
FullDocumentBeforeChange.WHEN_AVAILABLE
: The change event includes a pre-image of the modified document for change events only if the pre-image is available.
FullDocumentBeforeChange.REQUIRED
: The change event includes a pre-image of the modified document for change events. If the pre-image is not available, the driver raises an error.
The post-image is the full version of a document after a change. To include the post-image in the change stream event, pass one of the following options to the fullDocument()
method:
FullDocument.UPDATE_LOOKUP
: The change event includes a copy of the entire changed document from some time after the change.
FullDocument.WHEN_AVAILABLE
: The change event includes a post-image of the modified document for change events only if the post-image is available.
FullDocument.REQUIRED
: The change event includes a post-image of the modified document for change events. If the post-image is not available, the driver raises an error.
The following example calls the watch()
method on a collection and includes the post-image of updated documents in the results by specifying the fullDocument
parameter:
collection.watch().fullDocument(FullDocument.UPDATE_LOOKUP).forEach { change -> println("Received a change: $change")}
With the change stream application running, updating a document in the restaurants
collection by using the preceding update example prints a change event resembling the following:
ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."},namespace=sample_restaurants.restaurants, destinationNamespace=null, fullDocument=Restaurant(name=Blarney Castle, cuisine=Irish),fullDocumentBeforeChange=null, documentKey={"_id": {"$oid": "..."}},clusterTime=Timestamp{value=..., seconds=..., inc=...},updateDescription=UpdateDescription{removedFields=[], updatedFields={"cuisine": "Irish"},truncatedArrays=[], disambiguatedPaths=null}, txnNumber=null, lsid=null, splitEvent=null,wallTime=BsonDateTime{value=...}}
To learn more about pre-images and post-images, see Change Streams with Document Pre- and Post-Images in the MongoDB Server manual.
To learn more about change streams, see Change Streams in the MongoDB Server manual.
To learn more about any of the methods or types discussed in this guide, see the following API documentation:
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4