Source code for core_cdc.targets.base

# -*- coding: utf-8 -*-

"""
Base target interface for CDC data replication.
"""

from __future__ import annotations

import re
from abc import ABC
from abc import abstractmethod
from logging import Logger
from typing import Any
from typing import List
from typing import Optional
from typing import Tuple

from core_mixins.interfaces.factory import IFactory
from core_mixins.utils import to_one_line
from pymysqlreplication.event import QueryEvent

from core_cdc.base import Record


[docs] class ITarget(IFactory, ABC): """ This is the base class for the specific implementations of the targets the data will be sent or replicated. A target could be a database, queue, topic, data warehouse, etc... """
[docs] def __init__( self, logger: Logger, execute_ddl: bool = False, send_data: bool = False, ) -> None: self.client: Optional[Any] = None self.execute_ddl = execute_ddl self.send_data = send_data self.logger = logger
[docs] @classmethod def registration_key(cls) -> str: """Return the registration key for this target class.""" return cls.__name__
[docs] def init_client(self, **kwargs) -> None: """ The target's implementations must implement this method """
[docs] def connect(self) -> None: """Connect to the target client if available.""" if self.client: self.client.connect()
[docs] def get_ddl_query(self, event: QueryEvent) -> str: """ Each engine could use a different query for DDL operations... :return: The query or None if not supported. """ # sql_statement = sub(r"/\*.*?\*/", "", event.query.lower()).strip() sql_statement = to_one_line(event.query.lower()) if "create schema" in sql_statement or "create database" in sql_statement: return self.get_create_schema_statement(event) if "drop schema" in sql_statement or "drop database" in sql_statement: return self.get_drop_schema_statement(event) if "create table" in sql_statement: return self.get_create_table_statement(event) if "alter table" in sql_statement: return self.get_alter_table_statement(event) if "drop table" in sql_statement: return self.get_drop_table_statement(event) return sql_statement
[docs] @staticmethod def get_add_column_ddl(schema: str, table: str, column: str, type_: str) -> str: """ Returns the DDL to add a new column """ return f"ALTER TABLE `{schema}`.`{table}` ADD COLUMN `{column}` {type_};"
[docs] @staticmethod def get_create_schema_statement(event: QueryEvent) -> str: """Get the DDL statement for creating a schema.""" return event.query
[docs] @staticmethod def get_drop_schema_statement(event: QueryEvent) -> str: """Get the DDL statement for dropping a schema.""" return event.query
[docs] @staticmethod def get_create_table_statement(event: QueryEvent) -> str: """Get the DDL statement for creating a table.""" return event.query
[docs] @staticmethod def get_alter_table_statement(event: QueryEvent) -> str: """Get the DDL statement for altering a table.""" return event.query
[docs] @staticmethod def get_schema_table_from_query(query: str) -> Tuple[str, str]: """ Returns schema, table from query using Regex """ # Matches backtick-quoted (`schema`.`table`) or unquoted (schema.table) identifiers. match = re.compile(r"`?([a-z0-9_]+)`?\.`?([a-z0-9_]+)`?").search(query) if match is None: raise ValueError(f"Could not extract schema and table from query: {query}") return match.group(1), match.group(2)
[docs] @staticmethod def get_drop_table_statement(event: QueryEvent) -> str: """Get the DDL statement for dropping a table.""" return event.query
[docs] def execute(self, query: str) -> None: """Execute a query using the target client.""" if self.client: self.client.execute(query)
[docs] def save(self, records: List[Record], **kwargs) -> None: """Save records to the target if send_data is enabled.""" if self.send_data: self._save(records, **kwargs) self.logger.info("%d records were sent to: %s!", len(records), self.registration_key())
@abstractmethod def _save(self, records: List[Record], **kwargs) -> None: """ Specific implementation to store the data into the Engine """
[docs] def close(self) -> None: """ Implement it if is required to release or close resources """