# -*- coding: utf-8 -*-
"""
MySQL BinLog processor for Change Data Capture
services.
"""
from abc import abstractmethod
from typing import Any
from typing import Dict
from typing import Iterator
from typing import List
from typing import Optional
import pymysql
from core_mixins.utils import to_one_line
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.event import GtidEvent
from pymysqlreplication.event import QueryEvent
from pymysqlreplication.event import RotateEvent
from pymysqlreplication.event import XidEvent
from pymysqlreplication.row_event import DeleteRowsEvent
from pymysqlreplication.row_event import UpdateRowsEvent
from pymysqlreplication.row_event import WriteRowsEvent
from core_cdc.base import Record, EventType
from core_cdc.processors.base import IProcessor
[docs]
class MySqlBinlogProcessor(IProcessor):
"""
It processes the events from the BinLog files.
The binary log contains “events” that describe database changes such as table creation
operations or changes to table data. It also contains events for statements that potentially
could have made changes (for example, a DELETE which matched no rows), unless row-based
logging is used. The binary log also contains information about how long each
statement took that updated data.
More information:
https://dev.mysql.com/doc/refman/8.0/en/binary-log.html
"""
[docs]
def __init__(
self,
stream: BinLogStreamReader,
connection_settings: Optional[Dict] = None,
**kwargs,
) -> None:
"""
https://python-mysql-replication.readthedocs.io/en/stable/binlogstream.html
:param stream: BinLogStreamReader object.
"""
super().__init__(**kwargs)
self.stream = stream
# To keep the tracking of the processed elements...
self.log_file = None
self.log_pos = None
self.gtid = None
self.xid = None
# Keeping track of schemas in cause are not available within
# the event.rows information...
self._schema_cache: Dict[str, List[str]] = {}
self.connection_settings = connection_settings
[docs]
def get_events(self) -> Iterator[Any]:
for event in self.stream:
self.logger.info("Received event: %s.", event.__class__.__name__)
self.log_file, self.log_pos = self.stream.log_file, self.stream.log_pos
self.logger.info("File: %s, Position: %s.", self.log_file, self.log_pos)
if isinstance(event, QueryEvent):
query: str = to_one_line(event.query.lower())
if "alter table" in query and event.table_map:
# Table schema has changed, we must update `_schema_cache`...
entry = list(event.table_map.values())[0]
self._fetch_table_columns(
schema=entry.schema,
table=entry.table,
)
yield event
if self.log_pos is not None:
self._update_log_pos(self.log_pos)
[docs]
def get_event_type(self, event: Any) -> EventType:
if isinstance(event, QueryEvent):
return EventType.DDL_STATEMENT
if isinstance(event, WriteRowsEvent):
return EventType.INSERT
if isinstance(event, UpdateRowsEvent):
return EventType.UPDATE
if isinstance(event, DeleteRowsEvent):
return EventType.DELETE
return EventType.GLOBAL
[docs]
def process_event(self, event: Any) -> None:
if isinstance(event, GtidEvent):
self.gtid = event.gtid
elif isinstance(event, XidEvent):
self.xid = event.xid
elif isinstance(event, RotateEvent):
self.logger.info("NEXT FILE: %s. POSITION: %s.", event.next_binlog, event.position)
self._update_log_file(event.next_binlog)
self._update_log_pos(event.position)
[docs]
def process_dml_event(self, event: Any) -> List[Record]:
if isinstance(event, WriteRowsEvent):
event_type = EventType.INSERT
attr = "values"
elif isinstance(event, DeleteRowsEvent):
event_type = EventType.DELETE
attr = "values"
elif isinstance(event, UpdateRowsEvent):
event_type = EventType.UPDATE
attr = "after_values"
else:
event_type = EventType.GLOBAL
attr = "values"
# Handle case where event.rows might be None
if not event.rows:
return []
return [
Record(
event_type=event_type,
record=self._map_values_to_columns(
row.get(attr, {}),
event.schema,
event.table,
),
service=self.service,
schema_name=event.schema,
table_name=event.table,
primary_key=event.primary_key,
global_id=self.gtid,
transaction_id=self.xid,
event_timestamp=event.timestamp,
source=self.log_file,
position=self.log_pos,
)
for row in event.rows
]
[docs]
def _map_values_to_columns(
self,
values: Dict,
schema: str,
table: str,
) -> Dict:
"""
Map UNKNOWN_COL* keys to actual column names.
:param values: Dictionary with potentially UNKNOWN_COL keys
:param schema: Database/schema name
:param table: Table name
:return: Dictionary with proper column names
"""
keys = list(values.keys())
if not any(str(k).startswith("UNKNOWN_COL") for k in keys):
return values
cache_key = f"{schema}.{table}"
if cache_key in self._schema_cache:
columns = self._schema_cache[cache_key]
else:
columns = self._fetch_table_columns(schema, table)
if not columns:
self.logger.warning("Could not map columns for `%s.%s`.", schema, table)
return values
mapped = {}
for idx, col_name in enumerate(columns):
unknown_key = f"UNKNOWN_COL{idx}"
if unknown_key in values:
mapped[col_name] = values[unknown_key]
# Fallback: use positional mapping...
elif idx < len(keys):
mapped[col_name] = values[keys[idx]]
return mapped
[docs]
def _fetch_table_columns(
self,
schema: str,
table: str,
) -> List[str]:
"""
Fetch column names from the database for a specific table.
:param schema: Database/schema name
:param table: Table name
:return: List of column names in order
"""
cache_key = f"{schema}.{table}"
if not self.connection_settings:
self.logger.warning(
"No connection_settings provided. Cannot fetch schema for %s", cache_key
)
return []
try:
with pymysql.connect(**self.connection_settings) as connection:
with connection.cursor() as cursor:
query = """
SELECT COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s
ORDER BY ORDINAL_POSITION"""
cursor.execute(query, (schema, table))
columns = [row[0] for row in cursor.fetchall()]
self.logger.info(
"Fetched %d columns for %s: %s",
len(columns), cache_key, columns
)
self._schema_cache[cache_key] = columns
return columns
except Exception as error: # pylint: disable=broad-exception-caught
self.logger.error("Failed to fetch columns for %s: %s", cache_key, error)
return []
[docs]
@abstractmethod
def _update_log_file(self, log_file_name: str):
""" Persist the current binlog file name so the stream can resume after restart. """
[docs]
@abstractmethod
def _update_log_pos(self, position: int):
""" Persist the current binlog position so the stream can resume after restart. """