Targets#

A target is the destination where CDC events are delivered after the processor captures them. It can be another database, a data warehouse, a message queue, an SNS topic, a file, anything your pipeline needs to write to.

core-cdc does not ship built-in target implementations. This is intentional: bundling specific connectors (Snowflake, MySQL, Kafka, …) would force every user to install dependencies they may not need and couple the library to third-party client libraries. Instead, ITarget defines a thin contract that you implement once in your own project using whatever connector fits your stack.

Design philosophy#

Decouple when possible. Directly writing CDC records into a target database inside _save is the simplest approach, but it creates tight coupling and makes it harder to add buffering, retries, or fan-out. The recommended pattern is to publish each Record (via record.to_json()) to a message queue or topic and let a separate consumer handle persistence. This keeps the CDC pipeline stateless and easy to scale.

Gate everything with flags. ITarget exposes two constructor flags:

  • execute_ddl: when True, DDL events (CREATE TABLE, ALTER TABLE, etc.) captured from the source are translated and forwarded to the target via execute(). Set to False if the target schema is managed externally.

  • send_data: when True, DML records are delivered to _save(). Set to False to run in a DDL-only or dry-run mode.

Both default to False so nothing is forwarded until you explicitly opt in.

Base Interface#

Base target interface for CDC data replication.

class core_cdc.targets.base.ITarget(logger: Logger, execute_ddl: bool = False, send_data: bool = False)[source]#

Bases: IFactory, ABC

This is the base class for the specific implementations of the targets the data will be sent or replicated. A target could be a database, queue, topic, data warehouse, etc…

__init__(logger: Logger, execute_ddl: bool = False, send_data: bool = False) None[source]#
classmethod registration_key() str[source]#

Return the registration key for this target class.

init_client(**kwargs) None[source]#

The target’s implementations must implement this method

connect() None[source]#

Connect to the target client if available.

get_ddl_query(event: QueryEvent) str[source]#

Each engine could use a different query for DDL operations… :return: The query or None if not supported.

static get_add_column_ddl(schema: str, table: str, column: str, type_: str) str[source]#

Returns the DDL to add a new column

static get_create_schema_statement(event: QueryEvent) str[source]#

Get the DDL statement for creating a schema.

static get_drop_schema_statement(event: QueryEvent) str[source]#

Get the DDL statement for dropping a schema.

static get_create_table_statement(event: QueryEvent) str[source]#

Get the DDL statement for creating a table.

static get_alter_table_statement(event: QueryEvent) str[source]#

Get the DDL statement for altering a table.

static get_schema_table_from_query(query: str) Tuple[str, str][source]#

Returns schema, table from query using Regex

static get_drop_table_statement(event: QueryEvent) str[source]#

Get the DDL statement for dropping a table.

execute(query: str) None[source]#

Execute a query using the target client.

save(records: List[Record], **kwargs) None[source]#

Save records to the target if send_data is enabled.

close() None[source]#

Implement it if is required to release or close resources

Implementing ITarget#

Subclass ITarget and implement at minimum:

init_client(**kwargs)

Instantiate and store your connector as self.client. Called once before execute() or _save() are invoked.

_save(records, **kwargs)

Receive a batch of Record objects and deliver them to the target. The public save() method calls this only when send_data=True.

All DDL helpers (get_create_table_statement, get_alter_table_statement, etc.) pass through event.query unchanged by default. Override any of them to translate the source SQL dialect to your target’s dialect, see the Snowflake example below.

MySQL#

The MySQLTarget class is provided as an example of how to implement ITarget for a MySQL destination. The _save method is intentionally left empty: directly inserting CDC records into a database couples the pipeline too tightly. The recommended pattern is to decouple by forwarding records to a message queue (SQS, SNS, Kafka, etc.) and letting a separate consumer handle persistence.

Implementing ITarget for MySQL#

from typing import List
import pymysql
from core_cdc.base import Record
from core_cdc.targets.base import ITarget


class MySQLTarget(ITarget):

    def init_client(self, **kwargs) -> None:
        """
        Expecting at least: host, user, password, database.
        More information: https://pymysql.readthedocs.io/en/latest/
        """
        self.connection_settings = kwargs

    def _save(self, records: List[Record], **kwargs) -> None:
        # Option A – forward to a queue for decoupled consumption
        for record in records:
            my_queue.send(record.to_json())

        # Option B – write directly with pymysql (coupled)
        with pymysql.connect(**self.connection_settings) as conn:
            with conn.cursor() as cursor:
                for record in records:
                    cursor.execute(
                        "INSERT INTO ... VALUES (%s, %s, ...)",
                        list(record.record.values()),
                    )
            conn.commit()

Snowflake#

core-cdc ships no built-in Snowflake target to avoid forcing a hard dependency on core-db (or any specific Snowflake connector) into every project that uses this library. Instead, the full reference implementation is provided below so you can copy it as-is or adapt it to your own connector.

The design intentionally leaves _save empty and promotes decoupling: rather than writing CDC records directly into Snowflake, the recommended pattern is to forward them to a message queue (SQS, SNS, Kafka, etc.) and let a separate consumer handle persistence. If you prefer a direct write, use client.get_merge_dml as shown in the usage example at the bottom of this section.

Reference implementation#

# -*- coding: utf-8 -*-
import re
from typing import List

from core_db.engines.snowflake_ import SnowflakeClient
from pymysqlreplication.event import QueryEvent

from core_cdc.base import Record
from core_cdc.targets.base import ITarget


class SnowflakeTarget(ITarget):

    def init_client(self, **kwargs) -> None:
        """
        Expecting: account, user, password, database, warehouse, schema, role.
        https://docs.snowflake.com/en/user-guide/python-connector-example.html
        """
        self.client = SnowflakeClient(**kwargs)

    def _save(self, records: List[Record], **kwargs):
        """
        Left empty to promote decoupling. Override in a subclass to forward
        records to a queue or write directly via self.client.get_merge_dml().
        """

    @staticmethod
    def get_add_column_ddl(schema: str, table: str, column: str, type_: str) -> str:
        return f"ALTER TABLE {schema}.{table} ADD COLUMN {column} {type_};"

    @staticmethod
    def get_create_schema_statement(event: QueryEvent) -> str:
        schema = event.schema.decode() if isinstance(event.schema, bytes) else event.schema
        return f"CREATE SCHEMA IF NOT EXISTS {schema};"

    @staticmethod
    def get_create_table_statement(event: QueryEvent) -> str:
        return SnowflakeTarget.transform_create_table_query(event.query.lower())

    @staticmethod
    def transform_create_table_query(query: str) -> str:
        """ Transform a MySQL CREATE TABLE query to Snowflake-compatible DDL. """
        mapper = [
            (r"`", ""),
            (r"create table", "create table if not exists"),
            (r"unique index \w+ \(+\w+ \w+\) \w+", ""),
            (r"\bnot null\b", ""),
            (r"\bnull\b", ""),
            (r" generated always as[.]*(.*?)\)", ""),
            (r"auto_increment", ""),
            (r"unsigned", ""),
            (r"zerofill", ""),
            (r" blob", " binary"),
            (r" varbinary[.]*(.*?)\)", " binary"),
            (r" year[.]*(.*?)\)", " varchar(4)"),
            (r" json", " object"),
            (r" enum[.]*(.*?)\)", " varchar"),
            (r" set[.]*(.*?)\)", " varchar"),
            (r"[ ]{2,}", " "),
            (r"[\n]*,", ","),
            (r",,", ","),
            (r",[\n]*\)", ")"),
        ]
        for pattern, replacement in mapper:
            query = re.sub(re.compile(pattern, re.MULTILINE), replacement, query)
        return query

Usage example#

import logging
import os
from pprint import pprint
from typing import Any, List

from core_mixins.logger import get_logger
from pymysqlreplication import BinLogStreamReader

from core_cdc.base import Record
from core_cdc.processors.mysql import MySqlBinlogProcessor

# Paste the SnowflakeTarget class above into your project, then:

logger = get_logger(
    log_level=int(os.getenv("LOGGER_LEVEL", str(logging.INFO))),
    reset_handlers=True)

class CustomMySqlBinlogProcessor(MySqlBinlogProcessor):
    def process_dml_event(self, event: Any, **kwargs) -> List[Record]:
        recs = super().process_dml_event(event, **kwargs)
        logger.info("The following records will be processed...")
        for rec in recs:
            pprint(rec.to_json())
        return recs

    def _update_log_file(self, log_file_name: str):
        # persist to your storage (DB, Redis, file, etc.)
        ...

    def _update_log_pos(self, position: int):
        # persist to your storage (DB, Redis, file, etc.)
        ...

class CustomTarget(SnowflakeTarget):
    def _save(self, records: List[Record], **kwargs):
        logger.info(f"Saving: {records}")
        query, params = self.client.get_merge_dml(
            target=records[0].table_name,
            columns=list(records[0].record.keys()),
            pk_ids=["id"],
            records=[rec.record for rec in records],
        )
        self.execute(f"USE SCHEMA {records[0].schema_name}")
        self.client.execute(query, params=params)
        self.client.commit()

    def execute(self, query: str):
        self.client.execute(query)

CONF = {
    "account":   os.getenv("SNOWFLAKE_ACCOUNT", ""),
    "user":      os.getenv("SNOWFLAKE_USER", ""),
    "password":  os.getenv("SNOWFLAKE_PASSWORD", ""),
    "database":  os.getenv("SNOWFLAKE_DATABASE", ""),
    "schema":    os.getenv("SNOWFLAKE_SCHEMA", "PUBLIC"),
    "warehouse": os.getenv("SNOWFLAKE_WAREHOUSE", ""),
    "role":      os.getenv("SNOWFLAKE_ROLE", ""),
}
cxn_params = {"host": "localhost", "port": 3306,
              "user": "root", "passwd": "mysql_password"}

target = CustomTarget(logger=logger, execute_ddl=True, send_data=True)
target.init_client(**CONF)
target.client.connect()
target.client.execute(f"USE {CONF['database']}")

stream = BinLogStreamReader(
    resume_stream=False,
    connection_settings=cxn_params,
    blocking=True,
    freeze_schema=False,
    server_id=1,
)
processor = CustomMySqlBinlogProcessor(
    stream=stream,
    targets=[target],
    connection_settings=cxn_params,
    service=os.getenv("SERVICE_NAME", "my-cdc-service"),
    logger=logger,
)
processor.execute()