Understanding CDC (Change Data Capture) synchronization principle in one article
Introduction to CDC (Change Data Capture)
Change Data Capture (CDC) is a technique used to track changes at the row level in database operations (insertions, updates, deletions) and notify other systems in the order of events. In disaster recovery scenarios, CDC primarily synchronizes data between a primary and a backup database, enabling real-time data syncing from the primary to the secondary database.
source ----------> CDC ----------> sink
Apache SeaTunnel CDC
SeaTunnel CDC offers two types of data synchronization:
- Snapshot Read: Reads historical data from a table.
- Incremental Tracking: Reads incremental log changes from a table.
Lock-Free Snapshot Synchronization
The lock-free snapshot synchronization phase is emphasized because many existing CDC platforms, such as Debezium, may lock tables during historical data synchronization. Snapshot reading is the process of synchronizing a database’s historical data. The basic flow of this process is as follows:
storage -------------> splitEnumerator ---------- split ----------> reader
^ |
| |
\----------------- report -----------/
Split Partitioning
splitEnumerator
(split distributor) partitions the table data into multiple splits based on specified fields (such as table ID or unique keys) and defined step size.
Parallel Processing
Each split is assigned to a different reader for parallel reading. A single reader will occupy one connection.
Event Feedback
After completing the read operation for a split, each reader reports progress back to the splitEnumerator
. The metadata for the split is provided as follows:
String splitId # Routing ID
TableId tableId # Table ID
SeatunnelRowType splitKeyType # The type of field used for partitioning
Object splitStart # Start point of the partition
Object splitEnd # End point of the partition
Once the reader receives the split information, it generates the appropriate SQL statements. Before starting, it logs the current split’s corresponding position in the database log. After completing the current split, the reader reports progress to the splitEnumerator
with the following data:
String splitId # Split ID
Offset highWatermark # Log position corresponding to the split, for future validation
Incremental Synchronization
The incremental synchronization phase begins after the snapshot read phase. In this stage, any changes occurring in the source database are captured and synchronized to the backup database in real-time. This phase listens to the database log (e.g., MySQL binlog). Incremental tracking is usually single-threaded to avoid duplicate pulls of the binlog and reduce database load. Therefore, only one reader is used, occupying a single connection.
data log -------------> splitEnumerator ---------- split ----------> reader
^ |
| |
\----------------- report -----------/
In the incremental synchronization phase, all splits and tables from the snapshot phase are combined into a single split. The split metadata during this phase is as follows:
String splitId
Offset startingOffset # The lowest log start position among all splits
Offset endingOffset # Log end position, or "continuous" if ongoing, e.g., in the incremental phase
List<TableId> tableIds
Map<TableId, Offset> tableWatermarks # Watermark for all splits
List<CompletedSnapshotSplitInfo> completedSnapshotSplitInfos # Snapshot phase split details
The CompletedSnapshotSplitInfo
fields are as follows:
String splitId
TableId tableId
SeatunnelRowType splitKeyType
Object splitStart
Object splitEnd
Offset watermark # Corresponds to the highWatermark in the report
The split in the incremental phase contains the watermark for all splits in the snapshot phase. The minimal watermark is selected as the starting point for incremental synchronization.
Exactly-Once Semantics
Whether in the snapshot read or incremental read phase, the database might also change for synchronization. How do we guarantee exactly once delivery?
Snapshot Read Phase
In the snapshot read phase, for example, a split is being synchronized while changes are happening, such as the insertion of a row k3
, an update to k2
, and a deletion of k1
. If no task identification is used during the read process, the updates could be lost. SeaTunnel handles this by:
- First, checking the binlog position (low watermark) before reading the split.
- Reading the data in the range
split{start, end}
. - Recording the high watermark after reading.
If high = low
, the data for the split has not changed during the read. If (high - low) > 0
, changes have occurred during processing. In such a case, SeaTunnel will:
- Cache the split data in memory as an in-memory table.
- Apply changes from
low watermark
tohigh watermark
in order, using primary keys to replay operations on the in-memory table. - Report the high watermark.
insert k3 update k2 delete k1
| | |
v v v
bin log --|---------------------------------------------------|-- log offset
low watermark high watermark
CDC reads: k1 k3 k4
| Replays
v
Real data: k2 k3' k4
Incremental Phase
Before starting the incremental phase, SeaTunnel first validates all splits from the previous step. Between splits, data may be updated, for instance, if new records are inserted between split1 and split2, they could be missed during the snapshot phase. To recover this data between splits, SeaTunnel follows this approach:
- From all split reports, find the smallest watermark as the start watermark to begin reading the log.
- For each log entry read, check
completedSnapshotSplitInfos
to see if the data has been processed in any split. If not, it is considered data between splits and should be corrected. - Once all splits are validated, the process moves to the full incremental phase.
|------------filter split2-----------------|
|----filter split1------|
data log -|-----------------------|------------------|----------------------------------|- log offset
min watermark split1 watermark split2 watermark max watermark
Checkpoint and Resume
What about pausing and resuming CDC? SeaTunnel uses a distributed snapshot algorithm (Chandy-Lamport):
Assume the system has two processes, p1
and p2
, where p1
has three variables X1 Y1 Z1
and p2
has three variables X2 Y2 Z2
. The initial states are as follows:
p1 p2
X1:0 X2:4
Y1:0 Y2:2
Z1:0 Z2:3
At this point, p1
initiates a global snapshot. p1
first records its process state, then sends a marker to p2
.
Before the marker reaches p2
, p2
sends message M
to p1
.
p1 p2
X1:0 -------marker-------> X2:4
Y1:0 <---------M---------- Y2:2
Z1:0 Z2:3
Upon receiving the marker, p2
records its state, and p1
receives the message M
. Since p1
already performed a local snapshot, it only needs to log the message M
. The final snapshot looks like this:
p1 M p2
X1:0 X2:4
Y1:0 Y2:2
Z1:0 Z2:3
In SeaTunnel CDC, markers are sent to all readers, split enumerators, writers, and other nodes, each of which keeps its memory state.