Targets#
A target is the destination where CDC events are delivered after the processor captures them. It can be another database, a data warehouse, a message queue, an SNS topic, a file, anything your pipeline needs to write to.
core-cdc does not ship built-in target implementations. This is intentional:
bundling specific connectors (Snowflake, MySQL, Kafka, …) would force every user to
install dependencies they may not need and couple the library to third-party client
libraries. Instead, ITarget defines a thin contract that you implement once in
your own project using whatever connector fits your stack.
Design philosophy#
Decouple when possible. Directly writing CDC records into a target database
inside _save is the simplest approach, but it creates tight coupling and makes
it harder to add buffering, retries, or fan-out. The recommended pattern is to
publish each Record (via record.to_json()) to a message queue or topic and
let a separate consumer handle persistence. This keeps the CDC pipeline stateless
and easy to scale.
Gate everything with flags. ITarget exposes two constructor flags:
execute_ddl: whenTrue, DDL events (CREATE TABLE,ALTER TABLE, etc.) captured from the source are translated and forwarded to the target viaexecute(). Set toFalseif the target schema is managed externally.send_data: whenTrue, DML records are delivered to_save(). Set toFalseto run in a DDL-only or dry-run mode.
Both default to False so nothing is forwarded until you explicitly opt in.
Base Interface#
Base target interface for CDC data replication.
- class core_cdc.targets.base.ITarget(logger: Logger, execute_ddl: bool = False, send_data: bool = False)[source]#
Bases:
IFactory,ABCThis is the base class for the specific implementations of the targets the data will be sent or replicated. A target could be a database, queue, topic, data warehouse, etc…
- get_ddl_query(event: QueryEvent) str[source]#
Each engine could use a different query for DDL operations… :return: The query or None if not supported.
- static get_add_column_ddl(schema: str, table: str, column: str, type_: str) str[source]#
Returns the DDL to add a new column
- static get_create_schema_statement(event: QueryEvent) str[source]#
Get the DDL statement for creating a schema.
- static get_drop_schema_statement(event: QueryEvent) str[source]#
Get the DDL statement for dropping a schema.
- static get_create_table_statement(event: QueryEvent) str[source]#
Get the DDL statement for creating a table.
- static get_alter_table_statement(event: QueryEvent) str[source]#
Get the DDL statement for altering a table.
- static get_schema_table_from_query(query: str) Tuple[str, str][source]#
Returns schema, table from query using Regex
- static get_drop_table_statement(event: QueryEvent) str[source]#
Get the DDL statement for dropping a table.
Implementing ITarget#
Subclass ITarget and implement at minimum:
init_client(**kwargs)Instantiate and store your connector as
self.client. Called once beforeexecute()or_save()are invoked._save(records, **kwargs)Receive a batch of
Recordobjects and deliver them to the target. The publicsave()method calls this only whensend_data=True.
All DDL helpers (get_create_table_statement, get_alter_table_statement, etc.)
pass through event.query unchanged by default. Override any of them to translate
the source SQL dialect to your target’s dialect, see the Snowflake example below.
MySQL#
The MySQLTarget class is provided as an example of how to implement
ITarget for a MySQL destination. The _save method is intentionally
left empty: directly inserting CDC records into a database couples the
pipeline too tightly. The recommended pattern is to decouple by forwarding
records to a message queue (SQS, SNS, Kafka, etc.) and letting a separate
consumer handle persistence.
Implementing ITarget for MySQL#
from typing import List
import pymysql
from core_cdc.base import Record
from core_cdc.targets.base import ITarget
class MySQLTarget(ITarget):
def init_client(self, **kwargs) -> None:
"""
Expecting at least: host, user, password, database.
More information: https://pymysql.readthedocs.io/en/latest/
"""
self.connection_settings = kwargs
def _save(self, records: List[Record], **kwargs) -> None:
# Option A – forward to a queue for decoupled consumption
for record in records:
my_queue.send(record.to_json())
# Option B – write directly with pymysql (coupled)
with pymysql.connect(**self.connection_settings) as conn:
with conn.cursor() as cursor:
for record in records:
cursor.execute(
"INSERT INTO ... VALUES (%s, %s, ...)",
list(record.record.values()),
)
conn.commit()
Snowflake#
core-cdc ships no built-in Snowflake target to avoid forcing a hard dependency
on core-db (or any specific Snowflake connector) into every project that uses
this library. Instead, the full reference implementation is provided below so you
can copy it as-is or adapt it to your own connector.
The design intentionally leaves _save empty and promotes decoupling: rather
than writing CDC records directly into Snowflake, the recommended pattern is to
forward them to a message queue (SQS, SNS, Kafka, etc.) and let a separate consumer
handle persistence. If you prefer a direct write, use client.get_merge_dml as
shown in the usage example at the bottom of this section.
Reference implementation#
# -*- coding: utf-8 -*-
import re
from typing import List
from core_db.engines.snowflake_ import SnowflakeClient
from pymysqlreplication.event import QueryEvent
from core_cdc.base import Record
from core_cdc.targets.base import ITarget
class SnowflakeTarget(ITarget):
def init_client(self, **kwargs) -> None:
"""
Expecting: account, user, password, database, warehouse, schema, role.
https://docs.snowflake.com/en/user-guide/python-connector-example.html
"""
self.client = SnowflakeClient(**kwargs)
def _save(self, records: List[Record], **kwargs):
"""
Left empty to promote decoupling. Override in a subclass to forward
records to a queue or write directly via self.client.get_merge_dml().
"""
@staticmethod
def get_add_column_ddl(schema: str, table: str, column: str, type_: str) -> str:
return f"ALTER TABLE {schema}.{table} ADD COLUMN {column} {type_};"
@staticmethod
def get_create_schema_statement(event: QueryEvent) -> str:
schema = event.schema.decode() if isinstance(event.schema, bytes) else event.schema
return f"CREATE SCHEMA IF NOT EXISTS {schema};"
@staticmethod
def get_create_table_statement(event: QueryEvent) -> str:
return SnowflakeTarget.transform_create_table_query(event.query.lower())
@staticmethod
def transform_create_table_query(query: str) -> str:
""" Transform a MySQL CREATE TABLE query to Snowflake-compatible DDL. """
mapper = [
(r"`", ""),
(r"create table", "create table if not exists"),
(r"unique index \w+ \(+\w+ \w+\) \w+", ""),
(r"\bnot null\b", ""),
(r"\bnull\b", ""),
(r" generated always as[.]*(.*?)\)", ""),
(r"auto_increment", ""),
(r"unsigned", ""),
(r"zerofill", ""),
(r" blob", " binary"),
(r" varbinary[.]*(.*?)\)", " binary"),
(r" year[.]*(.*?)\)", " varchar(4)"),
(r" json", " object"),
(r" enum[.]*(.*?)\)", " varchar"),
(r" set[.]*(.*?)\)", " varchar"),
(r"[ ]{2,}", " "),
(r"[\n]*,", ","),
(r",,", ","),
(r",[\n]*\)", ")"),
]
for pattern, replacement in mapper:
query = re.sub(re.compile(pattern, re.MULTILINE), replacement, query)
return query
Usage example#
import logging
import os
from pprint import pprint
from typing import Any, List
from core_mixins.logger import get_logger
from pymysqlreplication import BinLogStreamReader
from core_cdc.base import Record
from core_cdc.processors.mysql import MySqlBinlogProcessor
# Paste the SnowflakeTarget class above into your project, then:
logger = get_logger(
log_level=int(os.getenv("LOGGER_LEVEL", str(logging.INFO))),
reset_handlers=True)
class CustomMySqlBinlogProcessor(MySqlBinlogProcessor):
def process_dml_event(self, event: Any, **kwargs) -> List[Record]:
recs = super().process_dml_event(event, **kwargs)
logger.info("The following records will be processed...")
for rec in recs:
pprint(rec.to_json())
return recs
def _update_log_file(self, log_file_name: str):
# persist to your storage (DB, Redis, file, etc.)
...
def _update_log_pos(self, position: int):
# persist to your storage (DB, Redis, file, etc.)
...
class CustomTarget(SnowflakeTarget):
def _save(self, records: List[Record], **kwargs):
logger.info(f"Saving: {records}")
query, params = self.client.get_merge_dml(
target=records[0].table_name,
columns=list(records[0].record.keys()),
pk_ids=["id"],
records=[rec.record for rec in records],
)
self.execute(f"USE SCHEMA {records[0].schema_name}")
self.client.execute(query, params=params)
self.client.commit()
def execute(self, query: str):
self.client.execute(query)
CONF = {
"account": os.getenv("SNOWFLAKE_ACCOUNT", ""),
"user": os.getenv("SNOWFLAKE_USER", ""),
"password": os.getenv("SNOWFLAKE_PASSWORD", ""),
"database": os.getenv("SNOWFLAKE_DATABASE", ""),
"schema": os.getenv("SNOWFLAKE_SCHEMA", "PUBLIC"),
"warehouse": os.getenv("SNOWFLAKE_WAREHOUSE", ""),
"role": os.getenv("SNOWFLAKE_ROLE", ""),
}
cxn_params = {"host": "localhost", "port": 3306,
"user": "root", "passwd": "mysql_password"}
target = CustomTarget(logger=logger, execute_ddl=True, send_data=True)
target.init_client(**CONF)
target.client.connect()
target.client.execute(f"USE {CONF['database']}")
stream = BinLogStreamReader(
resume_stream=False,
connection_settings=cxn_params,
blocking=True,
freeze_schema=False,
server_id=1,
)
processor = CustomMySqlBinlogProcessor(
stream=stream,
targets=[target],
connection_settings=cxn_params,
service=os.getenv("SERVICE_NAME", "my-cdc-service"),
logger=logger,
)
processor.execute()