MS SQL Server#
Microsoft SQL Server provides a built-in Change Data Capture feature that tracks
INSERT, UPDATE, and DELETE operations on selected tables at the database
level. A Python client can query the CDC change tables to consume those events.
How it works#
When CDC is enabled, SQL Server maintains change tables populated automatically by the database engine. Each change table stores the affected column values and the type of operation. Changes are queried through a system function rather than a log-streaming API.
Enabling CDC#
-- Enable CDC at the database level
EXEC sys.sp_cdc_enable_db;
-- Enable CDC for a specific table
EXEC sys.sp_cdc_enable_table
@source_schema = 'dbo',
@source_name = 'YourTableName',
@role_name = 'cdc_admin';
Once enabled, use cdc.fn_cdc_get_all_changes_<capture_instance> to retrieve
changes for a specific capture instance.
Implementing with IProcessor#
To integrate MS SQL Server CDC with this library, subclass IProcessor and
implement the three abstract methods:
import pymssql
from core_cdc.processors.base import IProcessor
from core_cdc.base import EventType, Record
class MsSqlCdcProcessor(IProcessor):
def get_events(self):
# Query cdc.fn_cdc_get_all_changes_* for new rows
...
def get_event_type(self, event) -> EventType:
operation = event["__$operation"]
mapping = {1: EventType.DELETE,
2: EventType.INSERT,
4: EventType.UPDATE}
return mapping.get(operation, EventType.GLOBAL)
def process_dml_event(self, event, **kwargs):
return [Record(
event_type=self.get_event_type(event),
record={k: v for k, v in event.items()
if not k.startswith("__$")},
...
)]