MongoDB#

Mongo Streams#

MongoDB Change Streams processor for Change Data Capture services.

class core_cdc.processors.mongo.mongo_stream.MongoDbStreamProcessor(stream: ChangeStream, save_full_event: bool = True, **kwargs)[source]#

Bases: IProcessor

It processes the events from the MongoDB Stream.

A change stream is a real-time stream of database changes that flows from your database to your application. With change streams, your applications can react—in real time—to data changes in a single collection, a database, or even an entire deployment. For apps that rely on notifications of changing data, change streams are critical.

More information: https://www.mongodb.com/basics/change-streams

__init__(stream: ChangeStream, save_full_event: bool = True, **kwargs) None[source]#
Parameters:
  • stream – DatabaseChangeStream object.

  • save_full_event – If True, all the event will be streamed, otherwise only fullDocument.

To create a stream you can use:
  • db.collection.watch()

  • db.watch()

Example:

pipeline = [
    {'$match': {'operationType': 'insert'}},
    {'$match': {'operationType': 'replace'}}
]

MongoDbStreamProcessor(
    stream = MongoClient(...)["database"].<collection>.watch(pipeline)
)

Resume Token Storage: Subclasses must implement save_resume_token() to persist resume tokens. This enables stream recovery after interruptions or application restarts.

Common storage patterns:

  • File-based: Simple JSON file storage

  • Database: Store in relational or NoSQL database

  • Redis/Cache: Fast in-memory storage with persistence

  • Cloud storage: S3, GCS, Azure Blob for distributed systems

Example implementation:

class MyProcessor(MongoDbStreamProcessor):
    def save_resume_token(self, token):
        # File-based storage
        with open('resume_token.json', 'w') as f:
            json.dump(token, f)
More information…
_validate_event(event: Dict) bool[source]#

Validates that a MongoDB change stream event has all required fields.

Required fields for all events: - _id: Resume token - operationType: Type of operation - ns: Namespace (db and collection) - clusterTime: Timestamp of the operation

Parameters:

event – MongoDB change stream event

Returns:

True if valid, False otherwise

get_events() Iterator[Any][source]#

Iterates through MongoDB change stream events with error handling.

Handles common MongoDB errors: - ConnectionFailure: Network issues, auto-reconnect attempts - CursorNotFound: Cursor expired (common in long-running streams) - OperationFailure: Server-side errors (permissions, invalid operations) - PyMongoError: Catch-all for other PyMongo exceptions

Yields:

Change stream events

Raises:

Re-raises exceptions after logging for caller to handle

get_event_type(event: Dict) EventType[source]#

Maps MongoDB change stream operation types to CDC EventType.

DML Operations (Data Manipulation): - insert -> INSERT - replace, update -> UPDATE - delete -> DELETE

DDL Operations (Data Definition): - drop, dropDatabase, rename -> DDL_STATEMENT - create, createIndexes, dropIndexes, modify -> DDL_STATEMENT - shardCollection, refineCollectionShardKey, reshardCollection -> DDL_STATEMENT

Special Operations: - invalidate -> GLOBAL (stream invalidated, requires restart) - Other unknown operations -> GLOBAL

Parameters:

event – MongoDB change stream event

Returns:

Corresponding EventType

process_dml_event(event: Any) List[Record][source]#

It processes the event and return the records to stream

abstractmethod save_resume_token(token: Any) None[source]#

It stores the token that can be used to resume the process in a certain point. Subclasses must implement this method to provide their own storage mechanism (e.g., file, database, cache, etc.).

Parameters:

token – The resume token from MongoDB change stream (_id field).

_abc_impl = <_abc._abc_data object>#
_impls: Dict[str, Type[Self]] = {}#

How to Use#

First, let’s create a local cluster to test the example, for it, let’s use Docker…

docker network create mongoCluster
docker run -d --rm -p 27017:27017 --name mongo1 --network mongoCluster mongo:5 mongod --replSet myReplicaSet --bind_ip localhost,mongo1
docker run -d --rm -p 27018:27017 --name mongo2 --network mongoCluster mongo:5 mongod --replSet myReplicaSet --bind_ip localhost,mongo2
docker run -d --rm -p 27019:27017 --name mongo3 --network mongoCluster mongo:5 mongod --replSet myReplicaSet --bind_ip localhost,mongo3

docker exec -it mongo1 mongosh --eval "rs.initiate({
 _id: \"myReplicaSet\",
 members: [
   {_id: 0, host: \"mongo1\"},
   {_id: 1, host: \"mongo2\"},
   {_id: 2, host: \"mongo3\"}
 ]
})"

Check the cluster status…

docker ps
docker exec -it mongo1 mongosh --eval "rs.status()"

Below, an example of how to use and process MongoDB Change Streams using this library…

# -*- coding: utf-8 -*-

import contextlib
import json
import logging
import os
from typing import List, Any, Dict
from pprint import pprint

from core_db.engines.mongo import MongoClient
from core_mixins.logger import get_logger
from pymongo.errors import PyMongoError

from core_cdc.base import Record
from core_cdc.processors.mongo_stream import MongoDbStreamProcessor
from core_cdc.targets.base import Target

token_path = "./local_token.txt"

logger = get_logger(
    logger_name="MongoDbStreamProcessorTestCases",
    log_level=int(os.getenv("LOGGER_LEVEL", str(logging.INFO))),
    reset_handlers=True)


class CustomMongoDbStreamProcessor(MongoDbStreamProcessor):
    """ Custom class to implement required methods """

    @classmethod
    def registered_name(cls) -> str:
        return cls.__name__

    def process_dml_event(self, event: Any, **kwargs) -> List[Record]:
        recs = super().process_dml_event(event, **kwargs)
        logger.info("The following records will be processed...")

        for rec in recs:
            pprint(rec.to_json())

        return recs

    def save_resume_token(self, token: Dict):
        with open(token_path, mode="w+") as file_:
            file_.write(json.dumps(token))


class CustomTarget(Target):
    @classmethod
    def registered_name(cls) -> str:
        return cls.__name__

    def _save(self, records: List[Record], **kwargs):
        logger.info(f"Saving: {records}")

client = MongoClient(
    host="localhost", port=27017, database="test",
    username=None, password=None,
    directConnection=True)

target = CustomTarget(
    logger=logger, execute_ddl=True,
    send_data=True)

with contextlib.suppress(Exception):
    with open(token_path) as file:
        resume_token = json.loads(file.read())

try:
    logger.info("Connecting to MongoDB server...")
    client.connect()

    with client.cxn.watch(full_document="updateLookup") as stream:
        processor = CustomMongoDbStreamProcessor(
            stream=stream,
            targets=[target],
            service=os.getenv("SERVICE_NAME", "Functional-Tests"),
            logger=logger)

        processor.execute()

except PyMongoError as error:
    logger.error(f"An error has been raised. Error: {error}.")

Once the above script is executed…

[INFO] Connecting to MongoDB server...
[INFO] Reading events from the stream...

Then, we can add a record via:

mongosh "mongodb://localhost:27017/"
use test
db.createCollection("people")
db.getCollection("people").insert({"name": "Alek", "age": 39})

The output…

[INFO] Received event: insert for document: 674d3c4778be6ac4a6210f24.
[INFO] The following records will be processed...
{'event_timestamp': 1733114951,
 'event_type': 'INSERT',
 'global_id': None,
 'position': 0,
 'primary_key': '_id',
 'record': {'_id': {'_data': '82674D3C47000000012B022C0100296E5A1004EED3A947B256417181A8398FEE8F22CD46645F69640064674D3C4778BE6AC4A6210F240004'},
            'clusterTime': 1733114951,
            'documentKey': {'_id': '674d3c4778be6ac4a6210f24'},
            'fullDocument': {'_id': '674d3c4778be6ac4a6210f24',
                             'age': 39,
                             'name': 'Alek'},
            'ns': {'coll': 'people', 'db': 'test'},
            'operationType': 'insert'},
 'schema_name': 'test',
 'service': 'Functional-Tests',
 'source': None,
 'table_name': 'people',
 'transaction_id': None}
[INFO] Saving: [<core_cdc.base.Record object at 0x7ab441c21910>]
[INFO] 1 records were sent to: CustomTarget!
[INFO] {'target': 'CustomTarget', 'number_of_records': 1, 'event_type': <EventType.INSERT: 'INSERT'>, 'schema': 'test', 'table': 'people'}

Let’s update the record, like: db.getCollection(“people”).updateOne({“name”: “Alek”}, { “$set”: {“age”: 30}}), the output will be…

[INFO] Received event: update for document: 674d3c4778be6ac4a6210f24.
[INFO] The following records will be processed...
{'event_timestamp': 1733115348,
 'event_type': 'UPDATE',
 'global_id': None,
 'position': 0,
 'primary_key': '_id',
 'record': {'_id': {'_data': '82674D3DD4000000012B022C0100296E5A1004EED3A947B256417181A8398FEE8F22CD46645F69640064674D3C4778BE6AC4A6210F240004'},
            'clusterTime': 1733115348,
            'documentKey': {'_id': '674d3c4778be6ac4a6210f24'},
            'fullDocument': {'_id': '674d3c4778be6ac4a6210f24',
                             'age': 30,
                             'name': 'Alek'},
            'ns': {'coll': 'people', 'db': 'test'},
            'operationType': 'update',
            'updateDescription': {'removedFields': [],
                                  'truncatedArrays': [],
                                  'updatedFields': {'age': 30}}},
 'schema_name': 'test',
 'service': 'Functional-Tests',
 'source': None,
 'table_name': 'people',
 'transaction_id': None}
[INFO] Saving: [<core_cdc.base.Record object at 0x7ab441e88ec0>]
[INFO] 1 records were sent to: CustomTarget!
[INFO] {'target': 'CustomTarget', 'number_of_records': 1, 'event_type': <EventType.UPDATE: 'UPDATE'>, 'schema': 'test', 'table': 'people'}

Let’s delete it using: db.getCollection(“people”).remove({“name”: “Alek”}), the console will show…

[INFO] Received event: delete for document: 674d3c4778be6ac4a6210f24.
[INFO] The following records will be processed...
{'event_timestamp': 1733115451,
 'event_type': 'DELETE',
 'global_id': None,
 'position': 0,
 'primary_key': '_id',
 'record': {'_id': {'_data': '82674D3E3B000000012B022C0100296E5A1004EED3A947B256417181A8398FEE8F22CD46645F69640064674D3C4778BE6AC4A6210F240004'},
            'clusterTime': 1733115451,
            'documentKey': {'_id': '674d3c4778be6ac4a6210f24'},
            'ns': {'coll': 'people', 'db': 'test'},
            'operationType': 'delete'},
 'schema_name': 'test',
 'service': 'Functional-Tests',
 'source': None,
 'table_name': 'people',
 'transaction_id': None}
[INFO] Saving: [<core_cdc.base.Record object at 0x7ab441e89130>]
[INFO] 1 records were sent to: CustomTarget!
[INFO] {'target': 'CustomTarget', 'number_of_records': 1, 'event_type': <EventType.DELETE: 'DELETE'>, 'schema': 'test', 'table': 'people'}

Delete the Docker’s containers…

docker stop mongo1 mongo2 mongo3