MySQL#

MySQL BinLog Processor#

MySQL BinLog processor for Change Data Capture services.

class core_cdc.processors.mysql.mysql_binlog.MySqlBinlogProcessor(stream: BinLogStreamReader, connection_settings: Dict | None = None, **kwargs)[source]#

Bases: IProcessor

It processes the events from the BinLog files.

The binary log contains “events” that describe database changes such as table creation operations or changes to table data. It also contains events for statements that potentially could have made changes (for example, a DELETE which matched no rows), unless row-based logging is used. The binary log also contains information about how long each statement took that updated data.

More information: https://dev.mysql.com/doc/refman/8.0/en/binary-log.html

__init__(stream: BinLogStreamReader, connection_settings: Dict | None = None, **kwargs) None[source]#

https://python-mysql-replication.readthedocs.io/en/stable/binlogstream.html :param stream: BinLogStreamReader object.

get_events() Iterator[Any][source]#

It returns an iterator with the events to process

get_event_type(event: Any) EventType[source]#

It returns the event type

process_event(event: Any) None[source]#

It should be implemented if another event (apart from DDL and DML) must be processed…

process_dml_event(event: Any) List[Record][source]#

It processes the event and return the records to stream

_map_values_to_columns(values: Dict, schema: str, table: str) Dict[source]#

Map UNKNOWN_COL* keys to actual column names.

Parameters:
  • values – Dictionary with potentially UNKNOWN_COL keys

  • schema – Database/schema name

  • table – Table name

Returns:

Dictionary with proper column names

_fetch_table_columns(schema: str, table: str) List[str][source]#

Fetch column names from the database for a specific table.

Parameters:
  • schema – Database/schema name

  • table – Table name

Returns:

List of column names in order

abstractmethod _update_log_file(log_file_name: str)[source]#

Persist the current binlog file name so the stream can resume after restart.

abstractmethod _update_log_pos(position: int)[source]#

Persist the current binlog position so the stream can resume after restart.

_abc_impl = <_abc._abc_data object>#
_impls: Dict[str, Type[Self]] = {}#

How to Use#

While using library core-cdc>=1.0.2 that uses mysql-replication>=1.0.7 the value of variable binlog_row_metadata must be FULL.

Let’s create a MySql server using Docker…

docker run \
   --name=MySqlServer \
   --env=MYSQL_ROOT_PASSWORD=mysql_password \
   --volume=/var/lib/mysql \
   -p 3306:3306 \
   --restart=no \
   --runtime=runc \
   -d mysql:5.7

Check the value in the server…

SHOW VARIABLES LIKE 'binlog_row_metadata'

Update the MySQL configuration file… This file is usually named my.cnf on Unix/Linux systems and my.ini on Windows. The location of this file can vary depending on your operating system and MySQL installation method. Common locations include /etc/mysql/my.cnf, /etc/my.cnf, or /usr/local/mysql/my.cnf.

Add or modify the binlog_row_metadata option in the [mysqld] section of the configuration file. Set it to FULL to enable full metadata logging.

[mysqld]
binlog_row_metadata = FULL

If you are using Docker based on oraclelinux-slim you can use:

docker exec -it {container-name} bash
microdnf install nano
nano /etc/my.cnf

Then, the below example script showcase how to process the MySQL BinLog…

# -*- coding: utf-8 -*-

import logging
import os
from pprint import pprint
from typing import Any
from typing import List

from core_mixins.logger import get_logger
from pymysqlreplication import BinLogStreamReader

from core_cdc.base import Record
from core_cdc.processors.mysql_binlog import MySqlBinlogProcessor
from core_cdc.targets.base import Target

cxn_params = {
    "host": "localhost",
    "port": 3306,
    "user": "root",
    "passwd": "mysql_password"
}

logger = get_logger(
    log_level=int(os.getenv("LOGGER_LEVEL", str(logging.INFO))),
    reset_handlers=True)


class CustomMySqlBinlogProcessor(MySqlBinlogProcessor):
    """ Custom class to implement required methods """

    @classmethod
    def registered_name(cls) -> str:
        return cls.__name__

    def process_dml_event(self, event: Any, **kwargs) -> List[Record]:
        recs = super().process_dml_event(event, **kwargs)
        logger.info("The following records will be processed...")

        for rec in recs:
            pprint(rec.to_json())

        return recs


class CustomTarget(Target):
    @classmethod
    def registered_name(cls) -> str:
        return cls.__name__

    def _save(self, records: List[Record], **kwargs):
        logger.info(f"Saving: {records}")


try:
    target = CustomTarget(
        logger=logger, execute_ddl=True,
        send_data=True)

    stream = BinLogStreamReader(
        resume_stream=False,
        connection_settings=cxn_params,
        blocking=True,
        freeze_schema=False,
        server_id=1)

    processor = CustomMySqlBinlogProcessor(
        stream=stream,
        targets=[target],
        service=os.getenv("SERVICE_NAME", "Functional-Tests"),
        logger=logger)

    processor.execute()

except Exception as error:
    logger.error(f"An error has been raised. Error: {error}.")

You will see something like…

[INFO] connection_settings: {'host': 'localhost', 'port': 3306, 'user': 'root', 'passwd': 'mysql_password', 'charset': 'utf8'}
[INFO] blocking: True
[INFO] allowed_events_in_packet: frozenset({<class 'pymysqlreplication.event.GtidEvent'>, <class 'pymysqlreplication.event.RandEvent'>, <class 'pymysqlreplication.event.StopEvent'>, <class 'pymysqlreplication.event.MariadbGtidListEvent'>, <class 'pymysqlreplication.event.QueryEvent'>, <class 'pymysqlreplication.row_event.TableMapEvent'>, <class 'pymysqlreplication.row_event.UpdateRowsEvent'>, <class 'pymysqlreplication.event.FormatDescriptionEvent'>, <class 'pymysqlreplication.row_event.WriteRowsEvent'>, <class 'pymysqlreplication.row_event.DeleteRowsEvent'>, <class 'pymysqlreplication.event.MariadbAnnotateRowsEvent'>, <class 'pymysqlreplication.event.ExecuteLoadQueryEvent'>, <class 'pymysqlreplication.event.MariadbStartEncryptionEvent'>, <class 'pymysqlreplication.event.HeartbeatLogEvent'>, <class 'pymysqlreplication.event.XAPrepareEvent'>, <class 'pymysqlreplication.event.MariadbGtidEvent'>, <class 'pymysqlreplication.event.MariadbBinLogCheckPointEvent'>, <class 'pymysqlreplication.event.BeginLoadQueryEvent'>, <class 'pymysqlreplication.event.UserVarEvent'>, <class 'pymysqlreplication.event.XidEvent'>, <class 'pymysqlreplication.row_event.PartialUpdateRowsEvent'>, <class 'pymysqlreplication.event.RowsQueryLogEvent'>, <class 'pymysqlreplication.event.RotateEvent'>, <class 'pymysqlreplication.event.PreviousGtidsEvent'>})
[INFO] server_id: 1
[INFO] Reading events from the stream...
[WARNING]
                    Before using MARIADB 10.5.0 and MYSQL 8.0.14 versions,
                    use python-mysql-replication version Before 1.0 version
[INFO] Received event: RotateEvent.
[INFO] File: c8db74e52957-bin.000002, Position: 4.
[INFO] NEXT FILE: c8db74e52957-bin.000002. POSITION: 4.
[INFO] Received event: FormatDescriptionEvent.
[INFO] File: c8db74e52957-bin.000002, Position: 123.
[INFO] Received event: PreviousGtidsEvent.
[INFO] File: c8db74e52957-bin.000002, Position: 154.

Let’s execute some DDL and DML statements and follow the output in the console…

Create database

CREATE DATABASE IF NOT EXISTS test_database;
[INFO] Received event: QueryEvent.
[INFO] File: c8db74e52957-bin.000002, Position: 418.
[INFO] The below query was executed in: CustomTarget.
[INFO] /* ApplicationName=DBeaver 24.3.0 - SQLEditor <Script-4.sql> */ CREATE DATABASE IF NOT EXISTS test_database

Create table

CREATE TABLE person (
    id INT AUTO_INCREMENT PRIMARY KEY,
    first_name VARCHAR(50) NOT NULL,
    last_name VARCHAR(50) NOT NULL,
    email VARCHAR(100),
    birth_date DATE,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
[INFO] Received event: QueryEvent.
[INFO] File: c8db74e52957-bin.000002, Position: 872.
[INFO] The below query was executed in: CustomTarget.
[INFO] /* ApplicationName=DBeaver 24.3.0 - SQLEditor <Script-4.sql> */ CREATE TABLE person (
    id INT AUTO_INCREMENT PRIMARY KEY,
    first_name VARCHAR(50) NOT NULL,
    last_name VARCHAR(50) NOT NULL,
    email VARCHAR(100),
    birth_date DATE,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)

Inserting records

INSERT INTO person (first_name, last_name, email, birth_date)
VALUES
('John', 'Doe', 'john.doe@example.com', '1990-01-15'),
('Jane', 'Smith', 'jane.smith@example.com', '1985-07-22');
[INFO] Received event: QueryEvent.
[INFO] File: c8db74e52957-bin.000002, Position: 1018.
[INFO] Received event: TableMapEvent.
[INFO] File: c8db74e52957-bin.000002, Position: 1088.
[INFO] Received event: WriteRowsEvent.
[INFO] File: c8db74e52957-bin.000002, Position: 1211.
[INFO] The following records will be processed...
{'event_timestamp': 1733197735,
 'event_type': 'INSERT',
 'global_id': None,
 'position': 1211,
 'primary_key': '',
 'record': {'id': 1,
            'first_name': 'John',
            'last_name': 'Doe',
            'email': 'john.doe@example.com',
            'birth_date': '1990-01-15',
            'created_at': '2024-12-03T03:48:55'},
 'schema_name': 'test_database',
 'service': 'Functional-Tests',
 'source': 'c8db74e52957-bin.000002',
 'table_name': 'person',
 'transaction_id': None}
{'event_timestamp': 1733197735,
 'event_type': 'INSERT',
 'global_id': None,
 'position': 1211,
 'primary_key': '',
 'record': {'id': 2,
            'first_name': 'Jane',
            'last_name': 'Smith',
            'email': 'jane.smith@example.com',
            'birth_date': '1985-07-22',
            'created_at': '2024-12-03T03:48:55'},
 'schema_name': 'test_database',
 'service': 'Functional-Tests',
 'source': 'c8db74e52957-bin.000002',
 'table_name': 'person',
 'transaction_id': None}
[INFO] Saving: [<core_cdc.base.Record object at 0x7281f3b55340>, <core_cdc.base.Record object at 0x7281f3b55370>]
[INFO] 2 records were sent to: CustomTarget!
[INFO] {'target': 'CustomTarget', 'number_of_records': 2, 'event_type': <EventType.INSERT: 'INSERT'>, 'schema': 'test_database', 'table': 'person'}
[INFO] Received event: XidEvent.
[INFO] File: c8db74e52957-bin.000002, Position: 1242.

Updating a record

UPDATE person
SET first_name = 'Jonathan', last_name = 'Dover'
WHERE id = 1;