MongoDB#
Mongo Streams#
MongoDB Change Streams processor for Change Data Capture services.
- class core_cdc.processors.mongo.mongo_stream.MongoDbStreamProcessor(stream: ChangeStream, save_full_event: bool = True, **kwargs)[source]#
Bases:
IProcessorIt processes the events from the MongoDB Stream.
A change stream is a real-time stream of database changes that flows from your database to your application. With change streams, your applications can react—in real time—to data changes in a single collection, a database, or even an entire deployment. For apps that rely on notifications of changing data, change streams are critical.
More information: https://www.mongodb.com/basics/change-streams
- __init__(stream: ChangeStream, save_full_event: bool = True, **kwargs) None[source]#
- Parameters:
stream – DatabaseChangeStream object.
save_full_event – If True, all the event will be streamed, otherwise only fullDocument.
- To create a stream you can use:
db.collection.watch()
db.watch()
Example:
pipeline = [ {'$match': {'operationType': 'insert'}}, {'$match': {'operationType': 'replace'}} ] MongoDbStreamProcessor( stream = MongoClient(...)["database"].<collection>.watch(pipeline) )
Resume Token Storage: Subclasses must implement save_resume_token() to persist resume tokens. This enables stream recovery after interruptions or application restarts.
Common storage patterns:
File-based: Simple JSON file storage
Database: Store in relational or NoSQL database
Redis/Cache: Fast in-memory storage with persistence
Cloud storage: S3, GCS, Azure Blob for distributed systems
Example implementation:
class MyProcessor(MongoDbStreamProcessor): def save_resume_token(self, token): # File-based storage with open('resume_token.json', 'w') as f: json.dump(token, f)
- _validate_event(event: Dict) bool[source]#
Validates that a MongoDB change stream event has all required fields.
Required fields for all events: - _id: Resume token - operationType: Type of operation - ns: Namespace (db and collection) - clusterTime: Timestamp of the operation
- Parameters:
event – MongoDB change stream event
- Returns:
True if valid, False otherwise
- get_events() Iterator[Any][source]#
Iterates through MongoDB change stream events with error handling.
Handles common MongoDB errors: - ConnectionFailure: Network issues, auto-reconnect attempts - CursorNotFound: Cursor expired (common in long-running streams) - OperationFailure: Server-side errors (permissions, invalid operations) - PyMongoError: Catch-all for other PyMongo exceptions
- Yields:
Change stream events
- Raises:
Re-raises exceptions after logging for caller to handle
- get_event_type(event: Dict) EventType[source]#
Maps MongoDB change stream operation types to CDC EventType.
DML Operations (Data Manipulation): - insert -> INSERT - replace, update -> UPDATE - delete -> DELETE
DDL Operations (Data Definition): - drop, dropDatabase, rename -> DDL_STATEMENT - create, createIndexes, dropIndexes, modify -> DDL_STATEMENT - shardCollection, refineCollectionShardKey, reshardCollection -> DDL_STATEMENT
Special Operations: - invalidate -> GLOBAL (stream invalidated, requires restart) - Other unknown operations -> GLOBAL
- Parameters:
event – MongoDB change stream event
- Returns:
Corresponding EventType
- process_dml_event(event: Any) List[Record][source]#
It processes the event and return the records to stream
- abstractmethod save_resume_token(token: Any) None[source]#
It stores the token that can be used to resume the process in a certain point. Subclasses must implement this method to provide their own storage mechanism (e.g., file, database, cache, etc.).
- Parameters:
token – The resume token from MongoDB change stream (_id field).
- _abc_impl = <_abc._abc_data object>#
How to Use#
First, let’s create a local cluster to test the example, for it, let’s use Docker…
docker network create mongoCluster
docker run -d --rm -p 27017:27017 --name mongo1 --network mongoCluster mongo:5 mongod --replSet myReplicaSet --bind_ip localhost,mongo1
docker run -d --rm -p 27018:27017 --name mongo2 --network mongoCluster mongo:5 mongod --replSet myReplicaSet --bind_ip localhost,mongo2
docker run -d --rm -p 27019:27017 --name mongo3 --network mongoCluster mongo:5 mongod --replSet myReplicaSet --bind_ip localhost,mongo3
docker exec -it mongo1 mongosh --eval "rs.initiate({
_id: \"myReplicaSet\",
members: [
{_id: 0, host: \"mongo1\"},
{_id: 1, host: \"mongo2\"},
{_id: 2, host: \"mongo3\"}
]
})"
Check the cluster status…
docker ps
docker exec -it mongo1 mongosh --eval "rs.status()"
Below, an example of how to use and process MongoDB Change Streams using this library…
# -*- coding: utf-8 -*-
import contextlib
import json
import logging
import os
from typing import List, Any, Dict
from pprint import pprint
from core_db.engines.mongo import MongoClient
from core_mixins.logger import get_logger
from pymongo.errors import PyMongoError
from core_cdc.base import Record
from core_cdc.processors.mongo_stream import MongoDbStreamProcessor
from core_cdc.targets.base import Target
token_path = "./local_token.txt"
logger = get_logger(
logger_name="MongoDbStreamProcessorTestCases",
log_level=int(os.getenv("LOGGER_LEVEL", str(logging.INFO))),
reset_handlers=True)
class CustomMongoDbStreamProcessor(MongoDbStreamProcessor):
""" Custom class to implement required methods """
@classmethod
def registered_name(cls) -> str:
return cls.__name__
def process_dml_event(self, event: Any, **kwargs) -> List[Record]:
recs = super().process_dml_event(event, **kwargs)
logger.info("The following records will be processed...")
for rec in recs:
pprint(rec.to_json())
return recs
def save_resume_token(self, token: Dict):
with open(token_path, mode="w+") as file_:
file_.write(json.dumps(token))
class CustomTarget(Target):
@classmethod
def registered_name(cls) -> str:
return cls.__name__
def _save(self, records: List[Record], **kwargs):
logger.info(f"Saving: {records}")
client = MongoClient(
host="localhost", port=27017, database="test",
username=None, password=None,
directConnection=True)
target = CustomTarget(
logger=logger, execute_ddl=True,
send_data=True)
with contextlib.suppress(Exception):
with open(token_path) as file:
resume_token = json.loads(file.read())
try:
logger.info("Connecting to MongoDB server...")
client.connect()
with client.cxn.watch(full_document="updateLookup") as stream:
processor = CustomMongoDbStreamProcessor(
stream=stream,
targets=[target],
service=os.getenv("SERVICE_NAME", "Functional-Tests"),
logger=logger)
processor.execute()
except PyMongoError as error:
logger.error(f"An error has been raised. Error: {error}.")
Once the above script is executed…
[INFO] Connecting to MongoDB server...
[INFO] Reading events from the stream...
Then, we can add a record via:
mongosh "mongodb://localhost:27017/"
use test
db.createCollection("people")
db.getCollection("people").insert({"name": "Alek", "age": 39})
The output…
[INFO] Received event: insert for document: 674d3c4778be6ac4a6210f24.
[INFO] The following records will be processed...
{'event_timestamp': 1733114951,
'event_type': 'INSERT',
'global_id': None,
'position': 0,
'primary_key': '_id',
'record': {'_id': {'_data': '82674D3C47000000012B022C0100296E5A1004EED3A947B256417181A8398FEE8F22CD46645F69640064674D3C4778BE6AC4A6210F240004'},
'clusterTime': 1733114951,
'documentKey': {'_id': '674d3c4778be6ac4a6210f24'},
'fullDocument': {'_id': '674d3c4778be6ac4a6210f24',
'age': 39,
'name': 'Alek'},
'ns': {'coll': 'people', 'db': 'test'},
'operationType': 'insert'},
'schema_name': 'test',
'service': 'Functional-Tests',
'source': None,
'table_name': 'people',
'transaction_id': None}
[INFO] Saving: [<core_cdc.base.Record object at 0x7ab441c21910>]
[INFO] 1 records were sent to: CustomTarget!
[INFO] {'target': 'CustomTarget', 'number_of_records': 1, 'event_type': <EventType.INSERT: 'INSERT'>, 'schema': 'test', 'table': 'people'}
Let’s update the record, like: db.getCollection(“people”).updateOne({“name”: “Alek”}, { “$set”: {“age”: 30}}), the output will be…
[INFO] Received event: update for document: 674d3c4778be6ac4a6210f24.
[INFO] The following records will be processed...
{'event_timestamp': 1733115348,
'event_type': 'UPDATE',
'global_id': None,
'position': 0,
'primary_key': '_id',
'record': {'_id': {'_data': '82674D3DD4000000012B022C0100296E5A1004EED3A947B256417181A8398FEE8F22CD46645F69640064674D3C4778BE6AC4A6210F240004'},
'clusterTime': 1733115348,
'documentKey': {'_id': '674d3c4778be6ac4a6210f24'},
'fullDocument': {'_id': '674d3c4778be6ac4a6210f24',
'age': 30,
'name': 'Alek'},
'ns': {'coll': 'people', 'db': 'test'},
'operationType': 'update',
'updateDescription': {'removedFields': [],
'truncatedArrays': [],
'updatedFields': {'age': 30}}},
'schema_name': 'test',
'service': 'Functional-Tests',
'source': None,
'table_name': 'people',
'transaction_id': None}
[INFO] Saving: [<core_cdc.base.Record object at 0x7ab441e88ec0>]
[INFO] 1 records were sent to: CustomTarget!
[INFO] {'target': 'CustomTarget', 'number_of_records': 1, 'event_type': <EventType.UPDATE: 'UPDATE'>, 'schema': 'test', 'table': 'people'}
Let’s delete it using: db.getCollection(“people”).remove({“name”: “Alek”}), the console will show…
[INFO] Received event: delete for document: 674d3c4778be6ac4a6210f24.
[INFO] The following records will be processed...
{'event_timestamp': 1733115451,
'event_type': 'DELETE',
'global_id': None,
'position': 0,
'primary_key': '_id',
'record': {'_id': {'_data': '82674D3E3B000000012B022C0100296E5A1004EED3A947B256417181A8398FEE8F22CD46645F69640064674D3C4778BE6AC4A6210F240004'},
'clusterTime': 1733115451,
'documentKey': {'_id': '674d3c4778be6ac4a6210f24'},
'ns': {'coll': 'people', 'db': 'test'},
'operationType': 'delete'},
'schema_name': 'test',
'service': 'Functional-Tests',
'source': None,
'table_name': 'people',
'transaction_id': None}
[INFO] Saving: [<core_cdc.base.Record object at 0x7ab441e89130>]
[INFO] 1 records were sent to: CustomTarget!
[INFO] {'target': 'CustomTarget', 'number_of_records': 1, 'event_type': <EventType.DELETE: 'DELETE'>, 'schema': 'test', 'table': 'people'}
Delete the Docker’s containers…
docker stop mongo1 mongo2 mongo3