Source code for core_cdc.processors.base

# -*- coding: utf-8 -*-

"""
Base processor interface for CDC (Change Data Capture) 
implementations. This module defines the IProcessor abstract 
base class that all CDC processors must implement, along 
with common CRUD event types.
"""

import json
from abc import abstractmethod
from logging import Logger
from typing import Any
from typing import Iterable
from typing import Iterator
from typing import List

from core_mixins.interfaces.factory import IFactory

from core_cdc.base import EventType, Record
from core_cdc.targets.base import ITarget

CRUD_EVENTS = (
    EventType.INSERT,
    EventType.UPDATE,
    EventType.DELETE,
)


[docs] class IProcessor(IFactory): """ Interface for all "Change Data Capture" implementations """
[docs] def __init__( # pylint: disable=too-many-arguments,too-many-positional-arguments self, service: str, targets: List[ITarget], logger: Logger, events_to_stream: Iterable[EventType] = CRUD_EVENTS, add_event_timestamp: bool = False, ) -> None: """ :param service: The name of the service that is doing the CDC process. :param events_to_stream: By default, insert, update and, delete operations will be streamed. :param logger: Logger used. :param add_event_timestamp: If True, the column event_timestamp will be added when a table is created. The column could be useful for UPSERT or MERGE operations. """ self.service = service self.targets = targets self.events_to_stream = events_to_stream self.add_event_timestamp = add_event_timestamp self.logger = logger
[docs] @classmethod def registration_key(cls) -> str: return cls.__name__
[docs] def execute(self) -> None: # pylint: disable=too-many-branches,too-many-nested-blocks """ Execute the CDC processor to read and process events from the stream. This method processes DDL and DML events, forwarding them to configured targets. """ self.logger.info("Reading events from the stream...") for event in self.get_events(): # pylint: disable=too-many-nested-blocks event_type = self.get_event_type(event) if event_type == EventType.DDL_STATEMENT: for target in self.targets: key = target.registration_key() if target.execute_ddl: try: query = target.get_ddl_query(event) target.execute(query) self.logger.info("The below query was executed in: %s.", key) self.logger.info(query) if self.add_event_timestamp: if "create table" in query.lower(): # Use the original event query (before any target-specific # transformation) so that backtick-based schema/table # extraction works regardless of the target dialect. source_query = getattr(event, "query", query) schema, table = ITarget.get_schema_table_from_query( source_query.lower() ) target.execute( target.get_add_column_ddl( schema=schema, table=table, column="event_timestamp", type_="bigint" ) ) except Exception as error: # pylint: disable=broad-exception-caught self.logger.error("[%s] -- %s", key, error, exc_info=True) elif event_type in self.events_to_stream: for target in self.targets: key = target.registration_key() try: records = self.process_dml_event(event) if records: target.save(records) self.logger.info(json.dumps({ "target": key, "number_of_records": len(records), "event_type": event_type, "schema": records[0].schema_name, "table": records[0].table_name })) except Exception as error: # pylint: disable=broad-exception-caught self.logger.error("[%s] -- %s", key, error, exc_info=True) else: try: self.process_event(event) except Exception as error: # pylint: disable=broad-exception-caught self.logger.error("Error: %s", error, exc_info=True)
[docs] @abstractmethod def get_events(self) -> Iterator[Any]: """ It returns an iterator with the events to process """
[docs] @abstractmethod def get_event_type(self, event: Any) -> EventType: """ It returns the event type """
[docs] @abstractmethod def process_dml_event(self, event: Any) -> List[Record]: """ It processes the event and return the records to stream """
[docs] def process_event(self, event: Any) -> None: """ It should be implemented if another event (apart from DDL and DML) must be processed... """