.. module:: Targets Targets ========================= A **target** is the destination where CDC events are delivered after the processor captures them. It can be another database, a data warehouse, a message queue, an SNS topic, a file, anything your pipeline needs to write to. ``core-cdc`` does **not** ship built-in target implementations. This is intentional: bundling specific connectors (Snowflake, MySQL, Kafka, …) would force every user to install dependencies they may not need and couple the library to third-party client libraries. Instead, ``ITarget`` defines a thin contract that you implement once in your own project using whatever connector fits your stack. Design philosophy ----------------------------------------------------------------------------------- **Decouple when possible.** Directly writing CDC records into a target database inside ``_save`` is the simplest approach, but it creates tight coupling and makes it harder to add buffering, retries, or fan-out. The recommended pattern is to publish each ``Record`` (via ``record.to_json()``) to a message queue or topic and let a separate consumer handle persistence. This keeps the CDC pipeline stateless and easy to scale. **Gate everything with flags.** ``ITarget`` exposes two constructor flags: - ``execute_ddl``: when ``True``, DDL events (``CREATE TABLE``, ``ALTER TABLE``, etc.) captured from the source are translated and forwarded to the target via ``execute()``. Set to ``False`` if the target schema is managed externally. - ``send_data``: when ``True``, DML records are delivered to ``_save()``. Set to ``False`` to run in a DDL-only or dry-run mode. Both default to ``False`` so nothing is forwarded until you explicitly opt in. Base Interface ----------------------------------------------------------------------------------- .. automodule:: core_cdc.targets.base :members: :undoc-members: :show-inheritance: Implementing ``ITarget`` ----------------------------------------------------------------------------------- Subclass ``ITarget`` and implement at minimum: ``init_client(**kwargs)`` Instantiate and store your connector as ``self.client``. Called once before ``execute()`` or ``_save()`` are invoked. ``_save(records, **kwargs)`` Receive a batch of :class:`~core_cdc.base.Record` objects and deliver them to the target. The public ``save()`` method calls this only when ``send_data=True``. All DDL helpers (``get_create_table_statement``, ``get_alter_table_statement``, etc.) pass through ``event.query`` unchanged by default. Override any of them to translate the source SQL dialect to your target's dialect, see the Snowflake example below. MySQL ----------------------------------------------------------------------------------- The ``MySQLTarget`` class is provided as an example of how to implement ``ITarget`` for a MySQL destination. The ``_save`` method is intentionally left empty: directly inserting CDC records into a database couples the pipeline too tightly. The recommended pattern is to decouple by forwarding records to a message queue (SQS, SNS, Kafka, etc.) and letting a separate consumer handle persistence. Implementing ``ITarget`` for MySQL ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. code-block:: python from typing import List import pymysql from core_cdc.base import Record from core_cdc.targets.base import ITarget class MySQLTarget(ITarget): def init_client(self, **kwargs) -> None: """ Expecting at least: host, user, password, database. More information: https://pymysql.readthedocs.io/en/latest/ """ self.connection_settings = kwargs def _save(self, records: List[Record], **kwargs) -> None: # Option A – forward to a queue for decoupled consumption for record in records: my_queue.send(record.to_json()) # Option B – write directly with pymysql (coupled) with pymysql.connect(**self.connection_settings) as conn: with conn.cursor() as cursor: for record in records: cursor.execute( "INSERT INTO ... VALUES (%s, %s, ...)", list(record.record.values()), ) conn.commit() Snowflake ----------------------------------------------------------------------------------- ``core-cdc`` ships no built-in Snowflake target to avoid forcing a hard dependency on ``core-db`` (or any specific Snowflake connector) into every project that uses this library. Instead, the full reference implementation is provided below so you can copy it as-is or adapt it to your own connector. The design intentionally leaves ``_save`` empty and promotes **decoupling**: rather than writing CDC records directly into Snowflake, the recommended pattern is to forward them to a message queue (SQS, SNS, Kafka, etc.) and let a separate consumer handle persistence. If you prefer a direct write, use ``client.get_merge_dml`` as shown in the usage example at the bottom of this section. Reference implementation ~~~~~~~~~~~~~~~~~~~~~~~~~~ .. code-block:: python # -*- coding: utf-8 -*- import re from typing import List from core_db.engines.snowflake_ import SnowflakeClient from pymysqlreplication.event import QueryEvent from core_cdc.base import Record from core_cdc.targets.base import ITarget class SnowflakeTarget(ITarget): def init_client(self, **kwargs) -> None: """ Expecting: account, user, password, database, warehouse, schema, role. https://docs.snowflake.com/en/user-guide/python-connector-example.html """ self.client = SnowflakeClient(**kwargs) def _save(self, records: List[Record], **kwargs): """ Left empty to promote decoupling. Override in a subclass to forward records to a queue or write directly via self.client.get_merge_dml(). """ @staticmethod def get_add_column_ddl(schema: str, table: str, column: str, type_: str) -> str: return f"ALTER TABLE {schema}.{table} ADD COLUMN {column} {type_};" @staticmethod def get_create_schema_statement(event: QueryEvent) -> str: schema = event.schema.decode() if isinstance(event.schema, bytes) else event.schema return f"CREATE SCHEMA IF NOT EXISTS {schema};" @staticmethod def get_create_table_statement(event: QueryEvent) -> str: return SnowflakeTarget.transform_create_table_query(event.query.lower()) @staticmethod def transform_create_table_query(query: str) -> str: """ Transform a MySQL CREATE TABLE query to Snowflake-compatible DDL. """ mapper = [ (r"`", ""), (r"create table", "create table if not exists"), (r"unique index \w+ \(+\w+ \w+\) \w+", ""), (r"\bnot null\b", ""), (r"\bnull\b", ""), (r" generated always as[.]*(.*?)\)", ""), (r"auto_increment", ""), (r"unsigned", ""), (r"zerofill", ""), (r" blob", " binary"), (r" varbinary[.]*(.*?)\)", " binary"), (r" year[.]*(.*?)\)", " varchar(4)"), (r" json", " object"), (r" enum[.]*(.*?)\)", " varchar"), (r" set[.]*(.*?)\)", " varchar"), (r"[ ]{2,}", " "), (r"[\n]*,", ","), (r",,", ","), (r",[\n]*\)", ")"), ] for pattern, replacement in mapper: query = re.sub(re.compile(pattern, re.MULTILINE), replacement, query) return query Usage example ~~~~~~~~~~~~~~ .. code-block:: python import logging import os from pprint import pprint from typing import Any, List from core_mixins.logger import get_logger from pymysqlreplication import BinLogStreamReader from core_cdc.base import Record from core_cdc.processors.mysql import MySqlBinlogProcessor # Paste the SnowflakeTarget class above into your project, then: logger = get_logger( log_level=int(os.getenv("LOGGER_LEVEL", str(logging.INFO))), reset_handlers=True) class CustomMySqlBinlogProcessor(MySqlBinlogProcessor): def process_dml_event(self, event: Any, **kwargs) -> List[Record]: recs = super().process_dml_event(event, **kwargs) logger.info("The following records will be processed...") for rec in recs: pprint(rec.to_json()) return recs def _update_log_file(self, log_file_name: str): # persist to your storage (DB, Redis, file, etc.) ... def _update_log_pos(self, position: int): # persist to your storage (DB, Redis, file, etc.) ... class CustomTarget(SnowflakeTarget): def _save(self, records: List[Record], **kwargs): logger.info(f"Saving: {records}") query, params = self.client.get_merge_dml( target=records[0].table_name, columns=list(records[0].record.keys()), pk_ids=["id"], records=[rec.record for rec in records], ) self.execute(f"USE SCHEMA {records[0].schema_name}") self.client.execute(query, params=params) self.client.commit() def execute(self, query: str): self.client.execute(query) CONF = { "account": os.getenv("SNOWFLAKE_ACCOUNT", ""), "user": os.getenv("SNOWFLAKE_USER", ""), "password": os.getenv("SNOWFLAKE_PASSWORD", ""), "database": os.getenv("SNOWFLAKE_DATABASE", ""), "schema": os.getenv("SNOWFLAKE_SCHEMA", "PUBLIC"), "warehouse": os.getenv("SNOWFLAKE_WAREHOUSE", ""), "role": os.getenv("SNOWFLAKE_ROLE", ""), } cxn_params = {"host": "localhost", "port": 3306, "user": "root", "passwd": "mysql_password"} target = CustomTarget(logger=logger, execute_ddl=True, send_data=True) target.init_client(**CONF) target.client.connect() target.client.execute(f"USE {CONF['database']}") stream = BinLogStreamReader( resume_stream=False, connection_settings=cxn_params, blocking=True, freeze_schema=False, server_id=1, ) processor = CustomMySqlBinlogProcessor( stream=stream, targets=[target], connection_settings=cxn_params, service=os.getenv("SERVICE_NAME", "my-cdc-service"), logger=logger, ) processor.execute()