.. module:: MongoDB MongoDB ========================= Mongo Streams ----------------------------------------------------------------------------------- .. automodule:: core_cdc.processors.mongo.mongo_stream :members: :undoc-members: :show-inheritance: :private-members: How to Use ----------------------------------------------------------------------------------- First, let's create a local cluster to test the example, for it, let's use Docker... .. code-block:: shell docker network create mongoCluster docker run -d --rm -p 27017:27017 --name mongo1 --network mongoCluster mongo:5 mongod --replSet myReplicaSet --bind_ip localhost,mongo1 docker run -d --rm -p 27018:27017 --name mongo2 --network mongoCluster mongo:5 mongod --replSet myReplicaSet --bind_ip localhost,mongo2 docker run -d --rm -p 27019:27017 --name mongo3 --network mongoCluster mongo:5 mongod --replSet myReplicaSet --bind_ip localhost,mongo3 docker exec -it mongo1 mongosh --eval "rs.initiate({ _id: \"myReplicaSet\", members: [ {_id: 0, host: \"mongo1\"}, {_id: 1, host: \"mongo2\"}, {_id: 2, host: \"mongo3\"} ] })" .. Check the cluster status... .. code-block:: shell docker ps docker exec -it mongo1 mongosh --eval "rs.status()" .. Below, an example of how to use and process MongoDB Change Streams using this library... .. code-block:: python # -*- coding: utf-8 -*- import contextlib import json import logging import os from typing import List, Any, Dict from pprint import pprint from core_db.engines.mongo import MongoClient from core_mixins.logger import get_logger from pymongo.errors import PyMongoError from core_cdc.base import Record from core_cdc.processors.mongo_stream import MongoDbStreamProcessor from core_cdc.targets.base import Target token_path = "./local_token.txt" logger = get_logger( logger_name="MongoDbStreamProcessorTestCases", log_level=int(os.getenv("LOGGER_LEVEL", str(logging.INFO))), reset_handlers=True) class CustomMongoDbStreamProcessor(MongoDbStreamProcessor): """ Custom class to implement required methods """ @classmethod def registered_name(cls) -> str: return cls.__name__ def process_dml_event(self, event: Any, **kwargs) -> List[Record]: recs = super().process_dml_event(event, **kwargs) logger.info("The following records will be processed...") for rec in recs: pprint(rec.to_json()) return recs def save_resume_token(self, token: Dict): with open(token_path, mode="w+") as file_: file_.write(json.dumps(token)) class CustomTarget(Target): @classmethod def registered_name(cls) -> str: return cls.__name__ def _save(self, records: List[Record], **kwargs): logger.info(f"Saving: {records}") client = MongoClient( host="localhost", port=27017, database="test", username=None, password=None, directConnection=True) target = CustomTarget( logger=logger, execute_ddl=True, send_data=True) with contextlib.suppress(Exception): with open(token_path) as file: resume_token = json.loads(file.read()) try: logger.info("Connecting to MongoDB server...") client.connect() with client.cxn.watch(full_document="updateLookup") as stream: processor = CustomMongoDbStreamProcessor( stream=stream, targets=[target], service=os.getenv("SERVICE_NAME", "Functional-Tests"), logger=logger) processor.execute() except PyMongoError as error: logger.error(f"An error has been raised. Error: {error}.") .. Once the above script is executed... .. code-block:: text [INFO] Connecting to MongoDB server... [INFO] Reading events from the stream... .. Then, we can add a record via: .. code-block:: shell mongosh "mongodb://localhost:27017/" use test db.createCollection("people") db.getCollection("people").insert({"name": "Alek", "age": 39}) .. The output... .. code-block:: text [INFO] Received event: insert for document: 674d3c4778be6ac4a6210f24. [INFO] The following records will be processed... {'event_timestamp': 1733114951, 'event_type': 'INSERT', 'global_id': None, 'position': 0, 'primary_key': '_id', 'record': {'_id': {'_data': '82674D3C47000000012B022C0100296E5A1004EED3A947B256417181A8398FEE8F22CD46645F69640064674D3C4778BE6AC4A6210F240004'}, 'clusterTime': 1733114951, 'documentKey': {'_id': '674d3c4778be6ac4a6210f24'}, 'fullDocument': {'_id': '674d3c4778be6ac4a6210f24', 'age': 39, 'name': 'Alek'}, 'ns': {'coll': 'people', 'db': 'test'}, 'operationType': 'insert'}, 'schema_name': 'test', 'service': 'Functional-Tests', 'source': None, 'table_name': 'people', 'transaction_id': None} [INFO] Saving: [] [INFO] 1 records were sent to: CustomTarget! [INFO] {'target': 'CustomTarget', 'number_of_records': 1, 'event_type': , 'schema': 'test', 'table': 'people'} .. Let's update the record, like: `db.getCollection("people").updateOne({"name": "Alek"}, { "$set": {"age": 30}})`, the output will be... .. code-block:: text [INFO] Received event: update for document: 674d3c4778be6ac4a6210f24. [INFO] The following records will be processed... {'event_timestamp': 1733115348, 'event_type': 'UPDATE', 'global_id': None, 'position': 0, 'primary_key': '_id', 'record': {'_id': {'_data': '82674D3DD4000000012B022C0100296E5A1004EED3A947B256417181A8398FEE8F22CD46645F69640064674D3C4778BE6AC4A6210F240004'}, 'clusterTime': 1733115348, 'documentKey': {'_id': '674d3c4778be6ac4a6210f24'}, 'fullDocument': {'_id': '674d3c4778be6ac4a6210f24', 'age': 30, 'name': 'Alek'}, 'ns': {'coll': 'people', 'db': 'test'}, 'operationType': 'update', 'updateDescription': {'removedFields': [], 'truncatedArrays': [], 'updatedFields': {'age': 30}}}, 'schema_name': 'test', 'service': 'Functional-Tests', 'source': None, 'table_name': 'people', 'transaction_id': None} [INFO] Saving: [] [INFO] 1 records were sent to: CustomTarget! [INFO] {'target': 'CustomTarget', 'number_of_records': 1, 'event_type': , 'schema': 'test', 'table': 'people'} .. Let's delete it using: `db.getCollection("people").remove({"name": "Alek"})`, the console will show... .. code-block:: text [INFO] Received event: delete for document: 674d3c4778be6ac4a6210f24. [INFO] The following records will be processed... {'event_timestamp': 1733115451, 'event_type': 'DELETE', 'global_id': None, 'position': 0, 'primary_key': '_id', 'record': {'_id': {'_data': '82674D3E3B000000012B022C0100296E5A1004EED3A947B256417181A8398FEE8F22CD46645F69640064674D3C4778BE6AC4A6210F240004'}, 'clusterTime': 1733115451, 'documentKey': {'_id': '674d3c4778be6ac4a6210f24'}, 'ns': {'coll': 'people', 'db': 'test'}, 'operationType': 'delete'}, 'schema_name': 'test', 'service': 'Functional-Tests', 'source': None, 'table_name': 'people', 'transaction_id': None} [INFO] Saving: [] [INFO] 1 records were sent to: CustomTarget! [INFO] {'target': 'CustomTarget', 'number_of_records': 1, 'event_type': , 'schema': 'test', 'table': 'people'} .. Delete the Docker's containers... .. code-block:: shell docker stop mongo1 mongo2 mongo3 ..