.. core-cdc documentation master file, created by
sphinx-quickstart on Mon Mar 31 22:11:12 2025.
You can adapt this file completely to your liking, but it should at least
contain the root `toctree` directive.
core-cdc
===============================================================================
It provides the core mechanism and required resources to
implement "Change Data Capture" services...
===============================================================================
.. image:: https://img.shields.io/pypi/pyversions/core-cdc.svg
:target: https://pypi.org/project/core-cdc/
:alt: Python Versions
.. image:: https://img.shields.io/badge/license-MIT-blue.svg
:target: https://gitlab.com/bytecode-solutions/core/core-cdc/-/blob/main/LICENSE
:alt: License
.. image:: https://gitlab.com/bytecode-solutions/core/core-cdc/badges/release/pipeline.svg
:target: https://gitlab.com/bytecode-solutions/core/core-cdc/-/pipelines
:alt: Pipeline Status
.. image:: https://readthedocs.org/projects/core-cdc/badge/?version=latest
:target: https://readthedocs.org/projects/core-cdc/
:alt: Docs Status
.. image:: https://img.shields.io/badge/security-bandit-yellow.svg
:target: https://github.com/PyCQA/bandit
:alt: Security
Documentation Contents
-------------------------------------------------------------------------------
.. toctree::
:maxdepth: 1
:caption: Index:
interfaces
processors
targets
Features
-------------------------------------------------------------------------------
**Multi-Database CDC Support**
- MySQL Binary Log (BinLog) based change capture
- MongoDB Change Streams for real-time event streaming
- Extensible processor architecture for additional database engines
**Comprehensive Event Handling**
- DML operations: INSERT, UPDATE, DELETE
- DDL operations: CREATE, ALTER, DROP (schemas and tables)
- Configurable event filtering by operation type
**Flexible Target Replication**
- Implement your own target by subclassing ``ITarget``
- Send records to any destination: database, queue, data warehouse, etc.
- Support for multiple simultaneous targets
**Standardized Data Format**
- Common Record structure for cross-service integration
- Includes metadata: timestamps, transaction IDs, source position
- JSON serialization support for streaming and messaging systems
**Production-Ready Features**
- Built-in error handling and retry mechanisms
- Comprehensive logging for monitoring and debugging
- Optional event timestamp column for UPSERT/MERGE operations
Installation
-------------------------------------------------------------------------------
Install from PyPI using pip:
.. code-block:: bash
pip install core-cdc
uv pip install core-cdc # Or using UV...
pip install -e ".[dev]" # For development...
Setting Up Environment
-------------------------------------------------------------------------------
1. Install required libraries:
.. code-block:: bash
pip install --upgrade pip
pip install virtualenv
2. Create Python virtual environment:
.. code-block:: bash
virtualenv --python=python3.12 .venv
3. Activate the virtual environment:
.. code-block:: bash
source .venv/bin/activate
Install packages
-------------------------------------------------------------------------------
.. code-block:: bash
pip install .
pip install -e ".[dev]"
Optional libraries
-------------------------------------------------------------------------------
.. code-block:: bash
pip install '.[all]' # MySQL + MongoDB
pip install '.[mysql]' # MySQL BinLog support
pip install '.[mongo]' # MongoDB Change Streams support
Check tests and coverage
-------------------------------------------------------------------------------
.. code-block:: bash
python manager.py run-tests # unit tests
python manager.py run-tests --test-type integration
python manager.py run-coverage
Functional Tests
-------------------------------------------------------------------------------
Functional tests execute against real database servers and are **not** run
automatically by ``pytest`` or ``tox``. They must be invoked explicitly after
the required Docker containers are running.
The quickest way to run both MySQL and MongoDB functional tests in one shot is
the helper script ``tests/functional/quick_test.sh``. It checks connectivity,
runs both test suites via ``python -m pytest``, and prints a metrics summary
from the JSON result files written by each test.
.. code-block:: bash
bash tests/functional/quick_test.sh
All connection parameters are overridable via environment variables (see the
table below). The script defaults match the Docker commands in the next section.
Alternatively, run individual test files directly:
.. code-block:: bash
python manager.py run-tests --test-type functional --pattern "*.py"
Environment variables accepted by the functional tests:
+---------------------------+-------------------+---------------------------------------+
| Variable | Default | Description |
+===========================+===================+=======================================+
| ``HOST_TEST_MONGO`` | ``localhost`` | MongoDB host |
+---------------------------+-------------------+---------------------------------------+
| ``PORT_TEST_MONGO`` | ``27017`` | MongoDB port |
+---------------------------+-------------------+---------------------------------------+
| ``DATABASE_TEST_MONGO`` | ``test`` | MongoDB database name |
+---------------------------+-------------------+---------------------------------------+
| ``USER_TEST_MONGO`` | *(none)* | MongoDB username (optional) |
+---------------------------+-------------------+---------------------------------------+
| ``PASSWORD_TEST_MONGO`` | *(none)* | MongoDB password (optional) |
+---------------------------+-------------------+---------------------------------------+
| ``HOST_TEST_MYSQL`` | ``localhost`` | MySQL host |
+---------------------------+-------------------+---------------------------------------+
| ``DATABASE_TEST_MYSQL`` | ``tests`` | MySQL database name |
+---------------------------+-------------------+---------------------------------------+
| ``USER_TEST_MYSQL`` | ``root`` | MySQL user |
+---------------------------+-------------------+---------------------------------------+
| ``PASSWORD_TEST_MYSQL`` | ``mysql_password``| MySQL password |
+---------------------------+-------------------+---------------------------------------+
Spinning Up Local Servers with Docker
-------------------------------------------------------------------------------
**MongoDB Replica Set** (required for Change Streams):
.. code-block:: bash
docker network create mongoCluster
docker run -d --rm -p 27017:27017 --name mongo1 --network mongoCluster \
mongo:5 mongod --replSet myReplicaSet --bind_ip localhost,mongo1
docker run -d --rm -p 27018:27017 --name mongo2 --network mongoCluster \
mongo:5 mongod --replSet myReplicaSet --bind_ip localhost,mongo2
docker run -d --rm -p 27019:27017 --name mongo3 --network mongoCluster \
mongo:5 mongod --replSet myReplicaSet --bind_ip localhost,mongo3
docker exec -it mongo1 mongosh --eval "rs.initiate({
_id: \"myReplicaSet\",
members: [
{_id: 0, host: \"mongo1\"},
{_id: 1, host: \"mongo2\"},
{_id: 2, host: \"mongo3\"}
]
})"
Check cluster status:
.. code-block:: bash
docker ps
docker exec -it mongo1 mongosh --eval "rs.status()"
**MySQL** (BinLog replication enabled by default in the official image):
.. code-block:: bash
docker run \
--env=MYSQL_ROOT_PASSWORD=mysql_password \
--volume=/var/lib/mysql \
-p 3306:3306 \
--restart=no \
-d mysql:latest
Implemented CDC Engines
===============================================================================
The following database engines have CDC implementations:
**Fully Implemented**
**MySQL** - Binary Log (BinLog) based CDC
- Uses ``mysql-replication`` library
- Captures INSERT, UPDATE, DELETE operations
- Supports DDL events (CREATE, ALTER, DROP)
- Fallback mechanism for column name resolution
- See: `core_cdc/processors/mysql/ `_
**MongoDB** - Change Streams based CDC
- Uses native MongoDB Change Streams
- Captures INSERT, UPDATE, DELETE operations
- Requires replica set configuration
- Real-time event streaming
- See: `core_cdc/processors/mongo/ `_
**Planned / Documentation Only**
**MS SQL Server** and **Oracle** implementations are not yet included.
Reference guides and implementation templates are available in the documentation.
Contributing
-------------------------------------------------------------------------------
Contributions are welcome! Please:
1. Fork the repository
2. Create a feature branch
3. Write tests for new functionality
4. Ensure all tests pass: ``python manager.py run-tests --test-type integration``
5. Run linting: ``pylint core_cdc``
6. Run security checks: ``bandit -r core_cdc``
7. Submit a pull request
License
-------------------------------------------------------------------------------
This project is licensed under the MIT License. See the LICENSE file for details.
Links
-------------------------------------------------------------------------------
* **Documentation:** https://core-cdc.readthedocs.io/en/latest/
* **Repository:** https://gitlab.com/bytecode-solutions/core/core-cdc
* **Issues:** https://gitlab.com/bytecode-solutions/core/core-cdc/-/issues
* **Changelog:** https://gitlab.com/bytecode-solutions/core/core-cdc/-/blob/master/CHANGELOG.md
* **PyPI:** https://pypi.org/project/core-cdc/
Support
-------------------------------------------------------------------------------
For questions or support, please open an issue on GitLab or contact the maintainers.
Authors
-------------------------------------------------------------------------------
* **Alejandro Cora González** - *Initial work* - alek.cora.glez@gmail.com