core-cdc#
It provides the core mechanism and required resources to implement “Change Data Capture” services…
Documentation Contents#
Index:
Features#
- Multi-Database CDC Support
MySQL Binary Log (BinLog) based change capture
MongoDB Change Streams for real-time event streaming
Extensible processor architecture for additional database engines
- Comprehensive Event Handling
DML operations: INSERT, UPDATE, DELETE
DDL operations: CREATE, ALTER, DROP (schemas and tables)
Configurable event filtering by operation type
- Flexible Target Replication
Implement your own target by subclassing
ITargetSend records to any destination: database, queue, data warehouse, etc.
Support for multiple simultaneous targets
- Standardized Data Format
Common Record structure for cross-service integration
Includes metadata: timestamps, transaction IDs, source position
JSON serialization support for streaming and messaging systems
- Production-Ready Features
Built-in error handling and retry mechanisms
Comprehensive logging for monitoring and debugging
Optional event timestamp column for UPSERT/MERGE operations
Installation#
Install from PyPI using pip:
pip install core-cdc
uv pip install core-cdc # Or using UV...
pip install -e ".[dev]" # For development...
Setting Up Environment#
Install required libraries:
pip install --upgrade pip
pip install virtualenv
Create Python virtual environment:
virtualenv --python=python3.12 .venv
Activate the virtual environment:
source .venv/bin/activate
Install packages#
pip install .
pip install -e ".[dev]"
Optional libraries#
pip install '.[all]' # MySQL + MongoDB
pip install '.[mysql]' # MySQL BinLog support
pip install '.[mongo]' # MongoDB Change Streams support
Check tests and coverage#
python manager.py run-tests # unit tests
python manager.py run-tests --test-type integration
python manager.py run-coverage
Functional Tests#
Functional tests execute against real database servers and are not run
automatically by pytest or tox. They must be invoked explicitly after
the required Docker containers are running.
The quickest way to run both MySQL and MongoDB functional tests in one shot is
the helper script tests/functional/quick_test.sh. It checks connectivity,
runs both test suites via python -m pytest, and prints a metrics summary
from the JSON result files written by each test.
bash tests/functional/quick_test.sh
All connection parameters are overridable via environment variables (see the table below). The script defaults match the Docker commands in the next section.
Alternatively, run individual test files directly:
python manager.py run-tests --test-type functional --pattern "*.py"
Environment variables accepted by the functional tests:
Variable |
Default |
Description |
|---|---|---|
|
|
MongoDB host |
|
|
MongoDB port |
|
|
MongoDB database name |
|
(none) |
MongoDB username (optional) |
|
(none) |
MongoDB password (optional) |
|
|
MySQL host |
|
|
MySQL database name |
|
|
MySQL user |
|
|
MySQL password |
Spinning Up Local Servers with Docker#
MongoDB Replica Set (required for Change Streams):
docker network create mongoCluster
docker run -d --rm -p 27017:27017 --name mongo1 --network mongoCluster \
mongo:5 mongod --replSet myReplicaSet --bind_ip localhost,mongo1
docker run -d --rm -p 27018:27017 --name mongo2 --network mongoCluster \
mongo:5 mongod --replSet myReplicaSet --bind_ip localhost,mongo2
docker run -d --rm -p 27019:27017 --name mongo3 --network mongoCluster \
mongo:5 mongod --replSet myReplicaSet --bind_ip localhost,mongo3
docker exec -it mongo1 mongosh --eval "rs.initiate({
_id: \"myReplicaSet\",
members: [
{_id: 0, host: \"mongo1\"},
{_id: 1, host: \"mongo2\"},
{_id: 2, host: \"mongo3\"}
]
})"
Check cluster status:
docker ps
docker exec -it mongo1 mongosh --eval "rs.status()"
MySQL (BinLog replication enabled by default in the official image):
docker run \
--env=MYSQL_ROOT_PASSWORD=mysql_password \
--volume=/var/lib/mysql \
-p 3306:3306 \
--restart=no \
-d mysql:latest
Implemented CDC Engines#
The following database engines have CDC implementations:
Fully Implemented
- MySQL - Binary Log (BinLog) based CDC
Uses
mysql-replicationlibraryCaptures INSERT, UPDATE, DELETE operations
Supports DDL events (CREATE, ALTER, DROP)
Fallback mechanism for column name resolution
- MongoDB - Change Streams based CDC
Uses native MongoDB Change Streams
Captures INSERT, UPDATE, DELETE operations
Requires replica set configuration
Real-time event streaming
Planned / Documentation Only
MS SQL Server and Oracle implementations are not yet included. Reference guides and implementation templates are available in the documentation.
Contributing#
Contributions are welcome! Please:
Fork the repository
Create a feature branch
Write tests for new functionality
Ensure all tests pass:
python manager.py run-tests --test-type integrationRun linting:
pylint core_cdcRun security checks:
bandit -r core_cdcSubmit a pull request
License#
This project is licensed under the MIT License. See the LICENSE file for details.
Links#
Documentation: https://core-cdc.readthedocs.io/en/latest/
Repository: bytecode-solutions/core/core-cdc
Changelog: bytecode-solutions/core/core-cdc/-/blob/master/CHANGELOG.md
Support#
For questions or support, please open an issue on GitLab or contact the maintainers.