core-cdc#

It provides the core mechanism and required resources to implement “Change Data Capture” services…


Python Versions License Pipeline Status Docs Status Security

Documentation Contents#

Features#

Multi-Database CDC Support
  • MySQL Binary Log (BinLog) based change capture

  • MongoDB Change Streams for real-time event streaming

  • Extensible processor architecture for additional database engines

Comprehensive Event Handling
  • DML operations: INSERT, UPDATE, DELETE

  • DDL operations: CREATE, ALTER, DROP (schemas and tables)

  • Configurable event filtering by operation type

Flexible Target Replication
  • Implement your own target by subclassing ITarget

  • Send records to any destination: database, queue, data warehouse, etc.

  • Support for multiple simultaneous targets

Standardized Data Format
  • Common Record structure for cross-service integration

  • Includes metadata: timestamps, transaction IDs, source position

  • JSON serialization support for streaming and messaging systems

Production-Ready Features
  • Built-in error handling and retry mechanisms

  • Comprehensive logging for monitoring and debugging

  • Optional event timestamp column for UPSERT/MERGE operations

Installation#

Install from PyPI using pip:

pip install core-cdc
uv pip install core-cdc     # Or using UV...
pip install -e ".[dev]"     # For development...

Setting Up Environment#

  1. Install required libraries:

pip install --upgrade pip
pip install virtualenv
  1. Create Python virtual environment:

virtualenv --python=python3.12 .venv
  1. Activate the virtual environment:

source .venv/bin/activate

Install packages#

pip install .
pip install -e ".[dev]"

Optional libraries#

pip install '.[all]'    # MySQL + MongoDB
pip install '.[mysql]'  # MySQL BinLog support
pip install '.[mongo]'  # MongoDB Change Streams support

Check tests and coverage#

python manager.py run-tests                   # unit tests
python manager.py run-tests --test-type integration
python manager.py run-coverage

Functional Tests#

Functional tests execute against real database servers and are not run automatically by pytest or tox. They must be invoked explicitly after the required Docker containers are running.

The quickest way to run both MySQL and MongoDB functional tests in one shot is the helper script tests/functional/quick_test.sh. It checks connectivity, runs both test suites via python -m pytest, and prints a metrics summary from the JSON result files written by each test.

bash tests/functional/quick_test.sh

All connection parameters are overridable via environment variables (see the table below). The script defaults match the Docker commands in the next section.

Alternatively, run individual test files directly:

python manager.py run-tests --test-type functional --pattern "*.py"

Environment variables accepted by the functional tests:

Variable

Default

Description

HOST_TEST_MONGO

localhost

MongoDB host

PORT_TEST_MONGO

27017

MongoDB port

DATABASE_TEST_MONGO

test

MongoDB database name

USER_TEST_MONGO

(none)

MongoDB username (optional)

PASSWORD_TEST_MONGO

(none)

MongoDB password (optional)

HOST_TEST_MYSQL

localhost

MySQL host

DATABASE_TEST_MYSQL

tests

MySQL database name

USER_TEST_MYSQL

root

MySQL user

PASSWORD_TEST_MYSQL

mysql_password

MySQL password

Spinning Up Local Servers with Docker#

MongoDB Replica Set (required for Change Streams):

docker network create mongoCluster

docker run -d --rm -p 27017:27017 --name mongo1 --network mongoCluster \
    mongo:5 mongod --replSet myReplicaSet --bind_ip localhost,mongo1

docker run -d --rm -p 27018:27017 --name mongo2 --network mongoCluster \
    mongo:5 mongod --replSet myReplicaSet --bind_ip localhost,mongo2

docker run -d --rm -p 27019:27017 --name mongo3 --network mongoCluster \
    mongo:5 mongod --replSet myReplicaSet --bind_ip localhost,mongo3

docker exec -it mongo1 mongosh --eval "rs.initiate({
  _id: \"myReplicaSet\",
  members: [
    {_id: 0, host: \"mongo1\"},
    {_id: 1, host: \"mongo2\"},
    {_id: 2, host: \"mongo3\"}
  ]
})"

Check cluster status:

docker ps
docker exec -it mongo1 mongosh --eval "rs.status()"

MySQL (BinLog replication enabled by default in the official image):

docker run \
  --env=MYSQL_ROOT_PASSWORD=mysql_password \
  --volume=/var/lib/mysql \
  -p 3306:3306 \
  --restart=no \
  -d mysql:latest

Implemented CDC Engines#

The following database engines have CDC implementations:

Fully Implemented

MySQL - Binary Log (BinLog) based CDC
  • Uses mysql-replication library

  • Captures INSERT, UPDATE, DELETE operations

  • Supports DDL events (CREATE, ALTER, DROP)

  • Fallback mechanism for column name resolution

  • See: core_cdc/processors/mysql/

MongoDB - Change Streams based CDC
  • Uses native MongoDB Change Streams

  • Captures INSERT, UPDATE, DELETE operations

  • Requires replica set configuration

  • Real-time event streaming

  • See: core_cdc/processors/mongo/

Planned / Documentation Only

MS SQL Server and Oracle implementations are not yet included. Reference guides and implementation templates are available in the documentation.

Contributing#

Contributions are welcome! Please:

  1. Fork the repository

  2. Create a feature branch

  3. Write tests for new functionality

  4. Ensure all tests pass: python manager.py run-tests --test-type integration

  5. Run linting: pylint core_cdc

  6. Run security checks: bandit -r core_cdc

  7. Submit a pull request

License#

This project is licensed under the MIT License. See the LICENSE file for details.

Support#

For questions or support, please open an issue on GitLab or contact the maintainers.

Authors#