.. module:: MySQL MySQL ========================= MySQL BinLog Processor ----------------------------------------------------------------------------------- .. automodule:: core_cdc.processors.mysql.mysql_binlog :members: :undoc-members: :show-inheritance: :private-members: How to Use ----------------------------------------------------------------------------------- While using library `core-cdc>=1.0.2` that uses `mysql-replication>=1.0.7` the value of variable `binlog_row_metadata` must be `FULL`. Let's create a MySql server using Docker... .. code-block:: python docker run \ --name=MySqlServer \ --env=MYSQL_ROOT_PASSWORD=mysql_password \ --volume=/var/lib/mysql \ -p 3306:3306 \ --restart=no \ --runtime=runc \ -d mysql:5.7 .. Check the value in the server... .. code-block:: sql SHOW VARIABLES LIKE 'binlog_row_metadata' .. Update the MySQL configuration file... This file is usually named my.cnf on Unix/Linux systems and my.ini on Windows. The location of this file can vary depending on your operating system and MySQL installation method. Common locations include `/etc/mysql/my.cnf`, `/etc/my.cnf`, or `/usr/local/mysql/my.cnf`. Add or modify the binlog_row_metadata option in the [mysqld] section of the configuration file. Set it to FULL to enable full metadata logging. .. code-block:: text [mysqld] binlog_row_metadata = FULL .. If you are using Docker based on `oraclelinux-slim` you can use: .. code-block:: shell docker exec -it {container-name} bash microdnf install nano nano /etc/my.cnf .. Then, the below example script showcase how to process the MySQL BinLog... .. code-block:: python # -*- coding: utf-8 -*- import logging import os from pprint import pprint from typing import Any from typing import List from core_mixins.logger import get_logger from pymysqlreplication import BinLogStreamReader from core_cdc.base import Record from core_cdc.processors.mysql_binlog import MySqlBinlogProcessor from core_cdc.targets.base import Target cxn_params = { "host": "localhost", "port": 3306, "user": "root", "passwd": "mysql_password" } logger = get_logger( log_level=int(os.getenv("LOGGER_LEVEL", str(logging.INFO))), reset_handlers=True) class CustomMySqlBinlogProcessor(MySqlBinlogProcessor): """ Custom class to implement required methods """ @classmethod def registered_name(cls) -> str: return cls.__name__ def process_dml_event(self, event: Any, **kwargs) -> List[Record]: recs = super().process_dml_event(event, **kwargs) logger.info("The following records will be processed...") for rec in recs: pprint(rec.to_json()) return recs class CustomTarget(Target): @classmethod def registered_name(cls) -> str: return cls.__name__ def _save(self, records: List[Record], **kwargs): logger.info(f"Saving: {records}") try: target = CustomTarget( logger=logger, execute_ddl=True, send_data=True) stream = BinLogStreamReader( resume_stream=False, connection_settings=cxn_params, blocking=True, freeze_schema=False, server_id=1) processor = CustomMySqlBinlogProcessor( stream=stream, targets=[target], service=os.getenv("SERVICE_NAME", "Functional-Tests"), logger=logger) processor.execute() except Exception as error: logger.error(f"An error has been raised. Error: {error}.") .. You will see something like... .. code-block:: text [INFO] connection_settings: {'host': 'localhost', 'port': 3306, 'user': 'root', 'passwd': 'mysql_password', 'charset': 'utf8'} [INFO] blocking: True [INFO] allowed_events_in_packet: frozenset({, , , , , , , , , , , , , , , , , , , , , , , }) [INFO] server_id: 1 [INFO] Reading events from the stream... [WARNING] Before using MARIADB 10.5.0 and MYSQL 8.0.14 versions, use python-mysql-replication version Before 1.0 version [INFO] Received event: RotateEvent. [INFO] File: c8db74e52957-bin.000002, Position: 4. [INFO] NEXT FILE: c8db74e52957-bin.000002. POSITION: 4. [INFO] Received event: FormatDescriptionEvent. [INFO] File: c8db74e52957-bin.000002, Position: 123. [INFO] Received event: PreviousGtidsEvent. [INFO] File: c8db74e52957-bin.000002, Position: 154. .. Let's execute some DDL and DML statements and follow the output in the console... Create database .. code-block:: shell CREATE DATABASE IF NOT EXISTS test_database; .. .. code-block:: text [INFO] Received event: QueryEvent. [INFO] File: c8db74e52957-bin.000002, Position: 418. [INFO] The below query was executed in: CustomTarget. [INFO] /* ApplicationName=DBeaver 24.3.0 - SQLEditor */ CREATE DATABASE IF NOT EXISTS test_database .. Create table .. code-block:: sql CREATE TABLE person ( id INT AUTO_INCREMENT PRIMARY KEY, first_name VARCHAR(50) NOT NULL, last_name VARCHAR(50) NOT NULL, email VARCHAR(100), birth_date DATE, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); .. .. code-block:: text [INFO] Received event: QueryEvent. [INFO] File: c8db74e52957-bin.000002, Position: 872. [INFO] The below query was executed in: CustomTarget. [INFO] /* ApplicationName=DBeaver 24.3.0 - SQLEditor */ CREATE TABLE person ( id INT AUTO_INCREMENT PRIMARY KEY, first_name VARCHAR(50) NOT NULL, last_name VARCHAR(50) NOT NULL, email VARCHAR(100), birth_date DATE, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) .. Inserting records .. code-block:: sql INSERT INTO person (first_name, last_name, email, birth_date) VALUES ('John', 'Doe', 'john.doe@example.com', '1990-01-15'), ('Jane', 'Smith', 'jane.smith@example.com', '1985-07-22'); .. .. code-block:: text [INFO] Received event: QueryEvent. [INFO] File: c8db74e52957-bin.000002, Position: 1018. [INFO] Received event: TableMapEvent. [INFO] File: c8db74e52957-bin.000002, Position: 1088. [INFO] Received event: WriteRowsEvent. [INFO] File: c8db74e52957-bin.000002, Position: 1211. [INFO] The following records will be processed... {'event_timestamp': 1733197735, 'event_type': 'INSERT', 'global_id': None, 'position': 1211, 'primary_key': '', 'record': {'id': 1, 'first_name': 'John', 'last_name': 'Doe', 'email': 'john.doe@example.com', 'birth_date': '1990-01-15', 'created_at': '2024-12-03T03:48:55'}, 'schema_name': 'test_database', 'service': 'Functional-Tests', 'source': 'c8db74e52957-bin.000002', 'table_name': 'person', 'transaction_id': None} {'event_timestamp': 1733197735, 'event_type': 'INSERT', 'global_id': None, 'position': 1211, 'primary_key': '', 'record': {'id': 2, 'first_name': 'Jane', 'last_name': 'Smith', 'email': 'jane.smith@example.com', 'birth_date': '1985-07-22', 'created_at': '2024-12-03T03:48:55'}, 'schema_name': 'test_database', 'service': 'Functional-Tests', 'source': 'c8db74e52957-bin.000002', 'table_name': 'person', 'transaction_id': None} [INFO] Saving: [, ] [INFO] 2 records were sent to: CustomTarget! [INFO] {'target': 'CustomTarget', 'number_of_records': 2, 'event_type': , 'schema': 'test_database', 'table': 'person'} [INFO] Received event: XidEvent. [INFO] File: c8db74e52957-bin.000002, Position: 1242. .. Updating a record .. code-block:: shell UPDATE person SET first_name = 'Jonathan', last_name = 'Dover' WHERE id = 1; ..