Processors#

Base Interface#

Base processor interface for CDC (Change Data Capture) implementations. This module defines the IProcessor abstract base class that all CDC processors must implement, along with common CRUD event types.

class core_cdc.processors.base.IProcessor(service: str, targets: List[ITarget], logger: Logger, events_to_stream: Iterable[EventType] = (EventType.INSERT, EventType.UPDATE, EventType.DELETE), add_event_timestamp: bool = False)[source]#

Bases: IFactory

Interface for all “Change Data Capture” implementations

__init__(service: str, targets: List[ITarget], logger: Logger, events_to_stream: Iterable[EventType] = (EventType.INSERT, EventType.UPDATE, EventType.DELETE), add_event_timestamp: bool = False) None[source]#
Parameters:
  • service – The name of the service that is doing the CDC process.

  • events_to_stream – By default, insert, update and, delete operations will be streamed.

  • logger – Logger used.

  • add_event_timestamp – If True, the column event_timestamp will be added when a table is created. The column could be useful for UPSERT or MERGE operations.

classmethod registration_key() str[source]#

It returns the name (reference) for the key used to register, like: return self.__name__

execute() None[source]#

Execute the CDC processor to read and process events from the stream. This method processes DDL and DML events, forwarding them to configured targets.

abstractmethod get_events() Iterator[Any][source]#

It returns an iterator with the events to process

abstractmethod get_event_type(event: Any) EventType[source]#

It returns the event type

abstractmethod process_dml_event(event: Any) List[Record][source]#

It processes the event and return the records to stream

process_event(event: Any) None[source]#

It should be implemented if another event (apart from DDL and DML) must be processed…

_abc_impl = <_abc._abc_data object>#
_impls: Dict[str, Type[Self]] = {}#