MySQL#
MySQL BinLog Processor#
MySQL BinLog processor for Change Data Capture services.
- class core_cdc.processors.mysql.mysql_binlog.MySqlBinlogProcessor(stream: BinLogStreamReader, connection_settings: Dict | None = None, **kwargs)[source]#
Bases:
IProcessorIt processes the events from the BinLog files.
The binary log contains “events” that describe database changes such as table creation operations or changes to table data. It also contains events for statements that potentially could have made changes (for example, a DELETE which matched no rows), unless row-based logging is used. The binary log also contains information about how long each statement took that updated data.
More information: https://dev.mysql.com/doc/refman/8.0/en/binary-log.html
- __init__(stream: BinLogStreamReader, connection_settings: Dict | None = None, **kwargs) None[source]#
https://python-mysql-replication.readthedocs.io/en/stable/binlogstream.html :param stream: BinLogStreamReader object.
- process_event(event: Any) None[source]#
It should be implemented if another event (apart from DDL and DML) must be processed…
- process_dml_event(event: Any) List[Record][source]#
It processes the event and return the records to stream
- _map_values_to_columns(values: Dict, schema: str, table: str) Dict[source]#
Map UNKNOWN_COL* keys to actual column names.
- Parameters:
values – Dictionary with potentially UNKNOWN_COL keys
schema – Database/schema name
table – Table name
- Returns:
Dictionary with proper column names
- _fetch_table_columns(schema: str, table: str) List[str][source]#
Fetch column names from the database for a specific table.
- Parameters:
schema – Database/schema name
table – Table name
- Returns:
List of column names in order
- abstractmethod _update_log_file(log_file_name: str)[source]#
Persist the current binlog file name so the stream can resume after restart.
- abstractmethod _update_log_pos(position: int)[source]#
Persist the current binlog position so the stream can resume after restart.
- _abc_impl = <_abc._abc_data object>#
How to Use#
While using library core-cdc>=1.0.2 that uses mysql-replication>=1.0.7 the value of variable binlog_row_metadata must be FULL.
Let’s create a MySql server using Docker…
docker run \
--name=MySqlServer \
--env=MYSQL_ROOT_PASSWORD=mysql_password \
--volume=/var/lib/mysql \
-p 3306:3306 \
--restart=no \
--runtime=runc \
-d mysql:5.7
Check the value in the server…
SHOW VARIABLES LIKE 'binlog_row_metadata'
Update the MySQL configuration file… This file is usually named my.cnf on Unix/Linux systems and my.ini on Windows. The location of this file can vary depending on your operating system and MySQL installation method. Common locations include /etc/mysql/my.cnf, /etc/my.cnf, or /usr/local/mysql/my.cnf.
Add or modify the binlog_row_metadata option in the [mysqld] section of the configuration file. Set it to FULL to enable full metadata logging.
[mysqld]
binlog_row_metadata = FULL
If you are using Docker based on oraclelinux-slim you can use:
docker exec -it {container-name} bash
microdnf install nano
nano /etc/my.cnf
Then, the below example script showcase how to process the MySQL BinLog…
# -*- coding: utf-8 -*-
import logging
import os
from pprint import pprint
from typing import Any
from typing import List
from core_mixins.logger import get_logger
from pymysqlreplication import BinLogStreamReader
from core_cdc.base import Record
from core_cdc.processors.mysql_binlog import MySqlBinlogProcessor
from core_cdc.targets.base import Target
cxn_params = {
"host": "localhost",
"port": 3306,
"user": "root",
"passwd": "mysql_password"
}
logger = get_logger(
log_level=int(os.getenv("LOGGER_LEVEL", str(logging.INFO))),
reset_handlers=True)
class CustomMySqlBinlogProcessor(MySqlBinlogProcessor):
""" Custom class to implement required methods """
@classmethod
def registered_name(cls) -> str:
return cls.__name__
def process_dml_event(self, event: Any, **kwargs) -> List[Record]:
recs = super().process_dml_event(event, **kwargs)
logger.info("The following records will be processed...")
for rec in recs:
pprint(rec.to_json())
return recs
class CustomTarget(Target):
@classmethod
def registered_name(cls) -> str:
return cls.__name__
def _save(self, records: List[Record], **kwargs):
logger.info(f"Saving: {records}")
try:
target = CustomTarget(
logger=logger, execute_ddl=True,
send_data=True)
stream = BinLogStreamReader(
resume_stream=False,
connection_settings=cxn_params,
blocking=True,
freeze_schema=False,
server_id=1)
processor = CustomMySqlBinlogProcessor(
stream=stream,
targets=[target],
service=os.getenv("SERVICE_NAME", "Functional-Tests"),
logger=logger)
processor.execute()
except Exception as error:
logger.error(f"An error has been raised. Error: {error}.")
You will see something like…
[INFO] connection_settings: {'host': 'localhost', 'port': 3306, 'user': 'root', 'passwd': 'mysql_password', 'charset': 'utf8'}
[INFO] blocking: True
[INFO] allowed_events_in_packet: frozenset({<class 'pymysqlreplication.event.GtidEvent'>, <class 'pymysqlreplication.event.RandEvent'>, <class 'pymysqlreplication.event.StopEvent'>, <class 'pymysqlreplication.event.MariadbGtidListEvent'>, <class 'pymysqlreplication.event.QueryEvent'>, <class 'pymysqlreplication.row_event.TableMapEvent'>, <class 'pymysqlreplication.row_event.UpdateRowsEvent'>, <class 'pymysqlreplication.event.FormatDescriptionEvent'>, <class 'pymysqlreplication.row_event.WriteRowsEvent'>, <class 'pymysqlreplication.row_event.DeleteRowsEvent'>, <class 'pymysqlreplication.event.MariadbAnnotateRowsEvent'>, <class 'pymysqlreplication.event.ExecuteLoadQueryEvent'>, <class 'pymysqlreplication.event.MariadbStartEncryptionEvent'>, <class 'pymysqlreplication.event.HeartbeatLogEvent'>, <class 'pymysqlreplication.event.XAPrepareEvent'>, <class 'pymysqlreplication.event.MariadbGtidEvent'>, <class 'pymysqlreplication.event.MariadbBinLogCheckPointEvent'>, <class 'pymysqlreplication.event.BeginLoadQueryEvent'>, <class 'pymysqlreplication.event.UserVarEvent'>, <class 'pymysqlreplication.event.XidEvent'>, <class 'pymysqlreplication.row_event.PartialUpdateRowsEvent'>, <class 'pymysqlreplication.event.RowsQueryLogEvent'>, <class 'pymysqlreplication.event.RotateEvent'>, <class 'pymysqlreplication.event.PreviousGtidsEvent'>})
[INFO] server_id: 1
[INFO] Reading events from the stream...
[WARNING]
Before using MARIADB 10.5.0 and MYSQL 8.0.14 versions,
use python-mysql-replication version Before 1.0 version
[INFO] Received event: RotateEvent.
[INFO] File: c8db74e52957-bin.000002, Position: 4.
[INFO] NEXT FILE: c8db74e52957-bin.000002. POSITION: 4.
[INFO] Received event: FormatDescriptionEvent.
[INFO] File: c8db74e52957-bin.000002, Position: 123.
[INFO] Received event: PreviousGtidsEvent.
[INFO] File: c8db74e52957-bin.000002, Position: 154.
Let’s execute some DDL and DML statements and follow the output in the console…
Create database
CREATE DATABASE IF NOT EXISTS test_database;
[INFO] Received event: QueryEvent.
[INFO] File: c8db74e52957-bin.000002, Position: 418.
[INFO] The below query was executed in: CustomTarget.
[INFO] /* ApplicationName=DBeaver 24.3.0 - SQLEditor <Script-4.sql> */ CREATE DATABASE IF NOT EXISTS test_database
Create table
CREATE TABLE person (
id INT AUTO_INCREMENT PRIMARY KEY,
first_name VARCHAR(50) NOT NULL,
last_name VARCHAR(50) NOT NULL,
email VARCHAR(100),
birth_date DATE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
[INFO] Received event: QueryEvent.
[INFO] File: c8db74e52957-bin.000002, Position: 872.
[INFO] The below query was executed in: CustomTarget.
[INFO] /* ApplicationName=DBeaver 24.3.0 - SQLEditor <Script-4.sql> */ CREATE TABLE person (
id INT AUTO_INCREMENT PRIMARY KEY,
first_name VARCHAR(50) NOT NULL,
last_name VARCHAR(50) NOT NULL,
email VARCHAR(100),
birth_date DATE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
Inserting records
INSERT INTO person (first_name, last_name, email, birth_date)
VALUES
('John', 'Doe', 'john.doe@example.com', '1990-01-15'),
('Jane', 'Smith', 'jane.smith@example.com', '1985-07-22');
[INFO] Received event: QueryEvent.
[INFO] File: c8db74e52957-bin.000002, Position: 1018.
[INFO] Received event: TableMapEvent.
[INFO] File: c8db74e52957-bin.000002, Position: 1088.
[INFO] Received event: WriteRowsEvent.
[INFO] File: c8db74e52957-bin.000002, Position: 1211.
[INFO] The following records will be processed...
{'event_timestamp': 1733197735,
'event_type': 'INSERT',
'global_id': None,
'position': 1211,
'primary_key': '',
'record': {'id': 1,
'first_name': 'John',
'last_name': 'Doe',
'email': 'john.doe@example.com',
'birth_date': '1990-01-15',
'created_at': '2024-12-03T03:48:55'},
'schema_name': 'test_database',
'service': 'Functional-Tests',
'source': 'c8db74e52957-bin.000002',
'table_name': 'person',
'transaction_id': None}
{'event_timestamp': 1733197735,
'event_type': 'INSERT',
'global_id': None,
'position': 1211,
'primary_key': '',
'record': {'id': 2,
'first_name': 'Jane',
'last_name': 'Smith',
'email': 'jane.smith@example.com',
'birth_date': '1985-07-22',
'created_at': '2024-12-03T03:48:55'},
'schema_name': 'test_database',
'service': 'Functional-Tests',
'source': 'c8db74e52957-bin.000002',
'table_name': 'person',
'transaction_id': None}
[INFO] Saving: [<core_cdc.base.Record object at 0x7281f3b55340>, <core_cdc.base.Record object at 0x7281f3b55370>]
[INFO] 2 records were sent to: CustomTarget!
[INFO] {'target': 'CustomTarget', 'number_of_records': 2, 'event_type': <EventType.INSERT: 'INSERT'>, 'schema': 'test_database', 'table': 'person'}
[INFO] Received event: XidEvent.
[INFO] File: c8db74e52957-bin.000002, Position: 1242.
Updating a record
UPDATE person
SET first_name = 'Jonathan', last_name = 'Dover'
WHERE id = 1;