Source code for core_cdc.base
# -*- coding: utf-8 -*-
"""
Base classes and types for CDC (Change Data Capture)
operations. This module defines the core EventType enum
and Record class used across all CDC processors to
represent database change events.
"""
from __future__ import annotations
import json
from dataclasses import dataclass
from datetime import date
from datetime import datetime
from typing import Any
from typing import Dict
from typing import Optional
from typing import Tuple
from core_mixins.compatibility import StrEnum
[docs]
class EventType(StrEnum):
"""Enumeration of CDC event types."""
GLOBAL = "GLOBAL" # Event related to the processor itself.
DDL_STATEMENT = "QUERY" # Event for DDL statement (like create table, etc.).
INSERT = "INSERT" # Event for DML INSERT operation.
UPDATE = "UPDATE" # Event for DML UPDATE operation.
DELETE = "DELETE" # Event for DML DELETE operation.
[docs]
@dataclass
class Record: # pylint: disable=too-many-instance-attributes
"""
It provides a wrapper or common object useful for integration
across services that needs to handle data replication
via Change Data Capture producers and consumers...
:param event_type: It specifies the operation that generated the event.
:param record: The record (like a record/row in a database).
:param service: The service from where the record came.
:param schema_name: Schema from where the record was retrieved.
:param table_name: Table from where the record was retrieved.
:param primary_key: Attributes to use to identify a record as unique.
:param global_id: Unique identifier (e.g. Global Transaction Identifier in MySQL).
:param transaction_id: Identifier for the transaction (e.g. xid in MySQL).
:param event_timestamp: Timestamp when the record was generated.
:param source: The source from where the record was retrieved (e.g. binlog file name).
:param position: Record position in the source.
"""
event_type: EventType
record: Dict
service: str
schema_name: str
table_name: str
primary_key: Tuple[Any, ...] | str
global_id: Optional[str] = None
transaction_id: Optional[str] = None
event_timestamp: Optional[int] = None
source: Optional[str] = None
position: Optional[int] = None
def __str__(self) -> str:
return json.dumps(self.to_json())
[docs]
def to_json(self) -> Dict:
"""
It returns the JSON version of the record required to be streamed...
:return: A dictionary that follows the below structure:
Example::
{
"global_id": "",
"transaction_id": "",
"event_timestamp": 1653685384,
"event_type": "INSERT | UPDATE | DELETE",
"service": "service-name",
"source": "binlog.000001 | FileName | Something",
"position": 8077,
"primary_key": "(str | tuple)",
"schema_name": "schema_name",
"table_name": "table_name",
"record": {
...
// This is an example...
"id": "000-1",
"category": "Marketing",
"price": 100.0,
...
}
}
"""
return {
"global_id": self.global_id,
"transaction_id": self.transaction_id,
"event_timestamp": self.event_timestamp,
"event_type": self.event_type.value,
"service": self.service,
"source": self.source,
"position": self.position,
"primary_key": self.primary_key,
"schema_name": self.schema_name,
"table_name": self.table_name,
"record": {
key: value.isoformat() if type(value) in (datetime, date) else value
for key, value in self.record.items()
}
}