[Demo 2]Sync Data From MySQL CDC to Doris by Apache SeaTunnel

Apache SeaTunnel
5 min readMay 24, 2024

--

Editor | Debra Chen

As data technology rapidly advances, understanding and mastering various tools and techniques becomes increasingly important. To this end, we are preparing to launch a Demo demonstration plan on the Apache SeaTunnel community on how to use connectors, inviting you all who are passionate about data synchronization technology to share your knowledge and practical experience!

In the last issue, we invited Gao Jun, a PMC member of the community, to record a demo tutorial in the video which themed How to sync data from MySQL to Doris. This time, Gao Jun recorded a demo to show How to sync data from MySQL CDC to Doris. Follow the guide, you can do that in minutes, too.

If you are interested in this plan, feel free to email me(xiyan@whaleops.com) to participate in the Demo recording! Whether you are a data engineer, developer, or technology enthusiast, you are welcome to showcase your technical talents.

Highlight~Highlight~ If you are a user and want to see synchronization scenarios Demo, please scroll down to the bottom and leave a comment, and we will prioritize producing the Demo with the highest demand for synchronization scenarios!

Demo Goals

Our goal is to create a platform for sharing and learning, helping community members better understand and apply various data connectors through specific Demo demonstrations and corresponding documentation. These demos can help beginners learn quickly and also provide a stage for senior experts to showcase innovative solutions.

This tutorial provides a comprehensive explanation of configuring and utilizing the MySQL CDC (Change Data Capture) connector, which enables streamlined retrieval of both snapshot and incremental data from MySQL databases through SeaTunnel Zeta and Flink.

Utilizing the MySQL CDC connector for effective data synchronization

Discover the usage of the MySQL CDC connector for retrieving both snapshots and incremental data from a MySQL database, and achieving efficient data processing with SeaTunnel Zeta and Flink.

The sample code referenced in this video:

env {
job.mode = "STREAMING"
parallelism = 1
}
source {
MySQL-CDC {
base-url = "jdbc:mysql://datasource01:3306/qa_source"
username = "root"
password = "root@123"

table-names = ["qa_source.batch_mysql_to_doris", "qa_source.batch_mysql_to_doris_offline_incremental_where"]
startup.mode = "latest"

}
}

sink {
Doris {
fenodes = "datasource01:8034"
query-port = 9034
username = root
password = "root@123"
schema_save_mode = "RECREATE_SCHEMA"
database = "e2e_sink"
table = "${table_name}_from_mysql"
sink.enable-2pc = "true"
sink.enable-delete = "true"
sink.label-prefix = "test_json"
doris.config = {
format="json"
read_json_by_line="true"
}
}
}

Supported engines

  • SeaTunnel Zeta
  • Flink

Key features

Supported Data Source

Dependency installation

Install JDBC driver

  • For Flink
    Make sure that the jdbc driver package is placed in the ${SEATUNNEL_HOME}/plugins/ directory.
  • For SeaTunnel Zeta engine
    Make sure that the jdbc driver package is placed in the ${SEATUNNEL_HOME}/lib/directory.

Create a MySQL user

You need to define a MySQL user with the appropriate permissions for the Debezium MySQL connector to monitor all databases.

  1. To create a MySQL user:
mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';

2. Grant the user the required permissions:

mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';

3. Final confirmation of user permissions:

mysql> FLUSH PRIVILEGES;

Enable MySQL Binlog

Binary logging must be enabled for MySQL replication. The binary log records transaction updates so that the replication tool propagates the changes.

  1. Check if the log-bin option is enabled:
mysql> show variables where variable_name in ('log_bin', 'binlog_format', 'binlog_row_image', 'gtid_mode', 'enforce_gtid_consistency');
+--------------------------+----------------+
| Variable_name | Value |
+--------------------------+----------------+
| binlog_format | ROW |
| binlog_row_image | FULL |
| enforce_gtid_consistency | ON |
| gtid_mode | ON |
| log_bin | ON |
+--------------------------+----------------+
5 rows in set (0.00 sec)

2. If the above result is inconsistent, set the following properties in your MySQL server configuration file ($MYSQL_HOME/mysql.cnf):

# Enable binary replication log and set the prefix, expiration, and log format.
# The prefix is arbitrary, expiration can be short for integration tests but would
# be longer on a production system. Row-level info is required for ingest to work.
# Server ID is required, but this will vary on production systems
server-id = 223344
log_bin = mysql-bin
expire_logs_days = 10
binlog_format = row
binlog_row_image = FULL

# enable gtid mode
gtid_mode = on
enforce_gtid_consistency = on

3. Restart MySQL server

/etc/inint.d/mysqld restart

4. Confirm your changes by double-checking the binlog status

mysql> show variables where variable_name in ('log_bin', 'binlog_format', 'binlog_row_image', 'gtid_mode', 'enforce_gtid_consistency');
+--------------------------+----------------+
| Variable_name | Value |
+--------------------------+----------------+
| binlog_format | ROW |
| binlog_row_image | FULL |
| enforce_gtid_consistency | ON |
| gtid_mode | ON |
| log_bin | ON |
+--------------------------+----------------+
5 rows in set (0.00 sec)

Note:

  • Set the MySQL session timeout

For large databases, established connections may time out when an initial consistent snapshot is taken. You can prevent this behavior by configuring interactive_timeout and wait_timeout.

interactive_timeout: The number of seconds that the server waits for an activity before closing an interactive connection.

wait_timeout: The number of seconds the server waits for activity before closing the non-interactive connection

Data type mapping

Source Option

Task example

  • Simple example: support for multi-table reading
env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 10000
}

source {
MySQL-CDC {
base-url = "jdbc:mysql://localhost:3306/testdb"
username = "root"
password = "root@123"
table-names = ["testdb.table1", "testdb.table2"]

startup.mode = "initial"
}
}
sink {
Console {
}
}

As the technology iterates, we will continue to refine the documentation and features, so stay tuned for future updates. The MySQL CDC connector described in this article enables developers to capture real-time change data from a MySQL database, enabling data synchronization and analysis without disrupting database operations.

This not only improves the efficiency of data processing but also provides powerful data support for building real-time data applications. Whether you’re a data engineer or a system architect, knowing how to configure and use the MySQL CDC connector will be a great benefit for you.

--

--

Apache SeaTunnel

The next-generation high-performance, distributed, massive data integration tool.