# -*- coding: utf-8 -*-
"""
MongoDB Change Streams processor for Change Data Capture
services.
"""
from abc import abstractmethod
from typing import Any
from typing import Dict
from typing import Iterator
from typing import List
from bson.objectid import ObjectId
from bson.timestamp import Timestamp
from pymongo.change_stream import ChangeStream
from pymongo.errors import ConnectionFailure
from pymongo.errors import CursorNotFound
from pymongo.errors import OperationFailure
from pymongo.errors import PyMongoError
from core_cdc.base import Record, EventType
from core_cdc.processors.base import IProcessor
[docs]
class MongoDbStreamProcessor(IProcessor):
"""
It processes the events from the MongoDB Stream.
A change stream is a real-time stream of database changes that flows from your
database to your application. With change streams, your applications can react—in real
time—to data changes in a single collection, a database, or even an entire
deployment. For apps that rely on notifications of changing data, change
streams are critical.
More information:
https://www.mongodb.com/basics/change-streams
"""
[docs]
def __init__(
self,
stream: ChangeStream,
save_full_event: bool = True,
**kwargs,
) -> None:
"""
:param stream: DatabaseChangeStream object.
:param save_full_event: If True, all the event will be streamed,
otherwise only fullDocument.
To create a stream you can use:
* db.collection.watch()
* db.watch()
Example:
.. code-block:: python
pipeline = [
{'$match': {'operationType': 'insert'}},
{'$match': {'operationType': 'replace'}}
]
MongoDbStreamProcessor(
stream = MongoClient(...)["database"].<collection>.watch(pipeline)
)
..
Resume Token Storage: Subclasses must implement `save_resume_token()` to
persist resume tokens. This enables stream recovery after
interruptions or application restarts.
Common storage patterns:
- File-based: Simple JSON file storage
- Database: Store in relational or NoSQL database
- Redis/Cache: Fast in-memory storage with persistence
- Cloud storage: S3, GCS, Azure Blob for distributed systems
**Example implementation:**
.. code-block:: python
class MyProcessor(MongoDbStreamProcessor):
def save_resume_token(self, token):
# File-based storage
with open('resume_token.json', 'w') as f:
json.dump(token, f)
..
More information...
* https://www.mongodb.com/basics/change-streams
* https://www.mongodb.com/docs/manual/changeStreams/#open-a-change-stream
* https://www.mongodb.com/docs/manual/reference/method/db.watch/#db.watch--
"""
super().__init__(**kwargs)
self.save_full_event = save_full_event
self.stream = stream
[docs]
def _validate_event(self, event: Dict) -> bool:
"""
Validates that a MongoDB change stream event has all required fields.
Required fields for all events:
- _id: Resume token
- operationType: Type of operation
- ns: Namespace (db and collection)
- clusterTime: Timestamp of the operation
:param event: MongoDB change stream event
:return: True if valid, False otherwise
"""
required_fields = ["_id", "operationType", "ns", "clusterTime"]
for field in required_fields:
if field not in event:
self.logger.error(
"Invalid event: missing required field '%s'. Event: %s",
field, event,
)
return False
# Validate ns structure (must have 'db' and 'coll' for collection operations)
ns = event.get("ns")
if not isinstance(ns, dict):
self.logger.error(
"Invalid event: 'ns' must be a dict. Got: %s. Event: %s",
type(ns).__name__, event,
)
return False
# For collection-level operations, ns must have db and coll
operation_type = event.get("operationType")
if operation_type not in ("dropDatabase", "invalidate"):
if "db" not in ns or "coll" not in ns:
self.logger.error(
"Invalid event: 'ns' missing 'db' or 'coll' for operation '%s'. Event: %s",
operation_type, event,
)
return False
# Validate documentKey exists for document-level operations
if operation_type in ("insert", "update", "replace", "delete"):
if "documentKey" not in event:
self.logger.error(
"Invalid event: '%s' operation missing 'documentKey'. Event: %s",
operation_type, event,
)
return False
return True
[docs]
def get_events(self) -> Iterator[Any]:
"""
Iterates through MongoDB change stream events with error handling.
Handles common MongoDB errors:
- ConnectionFailure: Network issues, auto-reconnect attempts
- CursorNotFound: Cursor expired (common in long-running streams)
- OperationFailure: Server-side errors (permissions, invalid operations)
- PyMongoError: Catch-all for other PyMongo exceptions
:yields: Change stream events
:raises: Re-raises exceptions after logging for caller to handle
"""
try:
for event in self.stream:
try:
# Validate event structure before processing
if not self._validate_event(event):
self.logger.warning(
"Skipping invalid event due to validation failure. "
"Operation: %s, Resume token: %s",
event.get('operationType', 'unknown'),
event.get('_id', 'N/A'),
)
continue
# Extract event details for logging
op_type = event.get('operationType')
ns = event.get('ns', {})
db_name = ns.get('db', 'unknown')
coll_name = ns.get('coll', 'unknown')
doc_id = event.get('documentKey', {}).get('_id', 'N/A')
# Info-level logging for received events
self.logger.info(
"Received event: %s on %s.%s, document: %s",
op_type, db_name, coll_name, doc_id,
)
# Debug-level: Log full event structure for troubleshooting
self.logger.debug("Full event data: %s", event)
# Debug-level: Log transaction info if present
if event.get("lsid") and event.get("txnNumber"):
self.logger.debug(
"Event is part of transaction: lsid=%s, txnNumber=%s",
event['lsid'], event['txnNumber'],
)
cluster_time = event.get("clusterTime")
if isinstance(cluster_time, Timestamp):
event["clusterTime"] = cluster_time.time
else:
event["clusterTime"] = None
# Debug-level: Log timestamp conversion
self.logger.debug(
"Event timestamp: clusterTime=%s, wallTime=%s",
event['clusterTime'], event.get('wallTime', 'N/A'),
)
yield event
# Save resume token for recovery
resume_token = event["_id"]
self.save_resume_token(resume_token)
self.logger.debug("Resume token saved: %s", resume_token)
except (KeyError, AttributeError, TypeError) as error:
# Handle malformed event data with detailed context
self.logger.error(
"Malformed event data encountered: %s. "
"Operation: %s, Namespace: %s, Resume token: %s. Skipping event.",
error,
event.get('operationType', 'unknown'),
event.get('ns', 'N/A'),
event.get('_id', 'N/A'),
exc_info=True,
)
continue
except ConnectionFailure as error:
# Network issues, connection lost, auto-reconnect failed
self.logger.error(
"Connection to MongoDB failed: %s. "
"Check network connectivity and MongoDB server status.", error,
)
raise
except CursorNotFound as error:
# Cursor expired (change stream idle too long)
self.logger.error(
"Change stream cursor not found: %s. "
"The cursor may have expired. Use resume tokens to continue from last position.",
error,
)
raise
except OperationFailure as error:
# Server-side operation errors (permissions, invalid query, etc.)
self.logger.error(
"MongoDB operation failed: %s. "
"Check permissions and change stream configuration.", error,
)
raise
except PyMongoError as error:
# Catch-all for other PyMongo errors
self.logger.error("PyMongo error while reading change stream: %s", error)
raise
except Exception as error:
# Unexpected errors
self.logger.error("Unexpected error in change stream: %s", error, exc_info=True)
raise
[docs]
def get_event_type(self, event: Dict) -> EventType:
"""
Maps MongoDB change stream operation types to CDC EventType.
DML Operations (Data Manipulation):
- insert -> INSERT
- replace, update -> UPDATE
- delete -> DELETE
DDL Operations (Data Definition):
- drop, dropDatabase, rename -> DDL_STATEMENT
- create, createIndexes, dropIndexes, modify -> DDL_STATEMENT
- shardCollection, refineCollectionShardKey, reshardCollection -> DDL_STATEMENT
Special Operations:
- invalidate -> GLOBAL (stream invalidated, requires restart)
- Other unknown operations -> GLOBAL
:param event: MongoDB change stream event
:return: Corresponding EventType
"""
opt_type = event.get("operationType")
# DML Operations
if opt_type == "insert":
return EventType.INSERT
if opt_type in ("replace", "update"):
return EventType.UPDATE
if opt_type == "delete":
return EventType.DELETE
# DDL Operations
if opt_type in (
"drop", # Collection dropped
"dropDatabase", # Database dropped
"rename", # Collection renamed
"create", # Collection created (4.2+)
"createIndexes", # Indexes created (4.2+)
"dropIndexes", # Indexes dropped (4.2+)
"modify", # Collection modified (4.2+)
"shardCollection", # Collection sharded (4.4+)
"refineCollectionShardKey", # Shard key refined (4.4+)
"reshardCollection", # Collection resharded (5.0+)
):
self.logger.info("DDL operation detected: %s", opt_type)
return EventType.DDL_STATEMENT
# Special Operations
if opt_type == "invalidate":
# Invalidate closes the stream - requires restart with startAfter
self.logger.warning(
"Invalidate event received. Change stream is now invalid. "
"This typically occurs after drop, dropDatabase, or rename operations. "
"Use startAfter (not resumeAfter) to restart the stream."
)
return EventType.GLOBAL
# Unknown/Future Operations
self.logger.warning("Unknown operation type: %s. Treating as GLOBAL event.", opt_type)
return EventType.GLOBAL
[docs]
def process_dml_event(self, event: Any) -> List[Record]: # pylint: disable=too-many-locals
event_type = self.get_event_type(event)
self.logger.debug("Processing DML event of type: %s", event_type)
# Extract transaction metadata if present
# lsid (logical session ID) + txnNumber uniquely identify a transaction
transaction_id = None
if event.get("lsid") and event.get("txnNumber"):
# Combine lsid and txnNumber to create unique transaction identifier
lsid_id = event["lsid"].get("id") if isinstance(event["lsid"], dict) else event["lsid"]
transaction_id = f"{lsid_id}:{event['txnNumber']}"
self.logger.debug("Event belongs to transaction: %s", transaction_id)
# Use wallTime (MongoDB 6.0+) as preferred timestamp, fallback to clusterTime
# wallTime is a DateTime value, clusterTime is already converted to Unix timestamp
event_timestamp = event.get("clusterTime") # Already converted in get_events()
if event.get("wallTime"):
# wallTime is a datetime object, convert to Unix timestamp
wall_time = event["wallTime"]
if hasattr(wall_time, "timestamp"):
event_timestamp = int(wall_time.timestamp())
self.logger.debug("Using wallTime for event timestamp: %s", event_timestamp)
schema_name: str = event["ns"]["db"]
table_name: str = event["ns"]["coll"]
self.logger.debug(
"Event metadata: db=%s, table=%s, timestamp=%s",
schema_name, table_name, event_timestamp,
)
# Safely convert documentKey._id to string if it's an ObjectId
if event.get("documentKey") and "_id" in event["documentKey"]:
doc_key_id = event["documentKey"]["_id"]
if isinstance(doc_key_id, ObjectId):
self.logger.debug(
"Converting documentKey._id from ObjectId to string: %s.", doc_key_id,
)
event["documentKey"]["_id"] = str(doc_key_id)
# Safely convert fullDocument._id to string if it's an ObjectId
if event.get("fullDocument") and "_id" in event["fullDocument"]:
full_doc_id = event["fullDocument"]["_id"]
if isinstance(full_doc_id, ObjectId):
self.logger.debug(
"Converting fullDocument._id from ObjectId to string: %s.", full_doc_id,
)
event["fullDocument"]["_id"] = str(full_doc_id)
# Log warning if UPDATE operation is missing fullDocument
if event_type == EventType.UPDATE and not event.get("fullDocument"):
doc_id = event.get("documentKey", {}).get("_id", "unknown")
# Also log updateDescription if available
update_desc = event.get("updateDescription", {})
updated_fields = update_desc.get("updatedFields", {})
removed_fields = update_desc.get("removedFields", [])
self.logger.warning(
"UPDATE event for document %s missing fullDocument. "
"UpdateDescription: %d fields updated, %d fields removed. "
"Consider enabling 'full_document=\"updateLookup\"' in watch() options "
"to include complete documents in UPDATE events.",
doc_id, len(updated_fields), len(removed_fields),
)
# Debug: Log actual field changes
if updated_fields or removed_fields:
self.logger.debug(
"Field changes - Updated: %s, Removed: %s",
list(updated_fields.keys()), removed_fields,
)
# Debug: Log final record size
record_data = event if self.save_full_event else event.get("fullDocument", {})
self.logger.debug("Creating record with %d top-level fields", len(record_data))
return [
Record(
event_type=event_type,
record=record_data,
service=self.service,
schema_name=schema_name,
table_name=table_name,
primary_key="_id",
transaction_id=transaction_id,
event_timestamp=event_timestamp,
)
]
[docs]
@abstractmethod
def save_resume_token(self, token: Any) -> None:
"""
It stores the token that can be used to resume the
process in a certain point. Subclasses must implement this
method to provide their own storage mechanism (e.g., file,
database, cache, etc.).
:param token: The resume token from MongoDB change stream (_id field).
"""