Harnessing the Power of SeaTunnel: Live Sync from ES to MySQL Guide
Preamble
Recently, our project required a few tables to be synchronized in real-time from MySQL to another MySQL, and some to be synchronized to ElasticSearch. Currently, our company uses Aliyun’s Data Transmission Service (DTS) for synchronization in the production environment. Each synchronization task costs over 500 yuan per month, which is a bit pricey.
In other environments, for MySQL synchronization to ElasticSearch, we use CloudCanal. It doesn’t support data conversion, and adding synchronized fields is a bit cumbersome. The community edition limits us to five tasks, which is insufficient. For MySQL synchronization to MySQL, we use debezium, which does not support writing to ES.
Coincidentally, three years ago, I used the predecessor of SeaTunnel, WaterDrop. So let’s get started with SeaTunnel. This article uses version 2.3.1 and the Ubuntu system as an example.
Open Source Data Integration Platform SeaTunnel
Introduction
SeaTunnel is a high-performance open-source big data integration tool under the Apache Software Foundation, providing a flexible, easy-to-use, easy-to-expand, and capable of handling hundreds of billions of data integration solutions.
SeaTunnel provides high-performance data synchronization capabilities for real-time (CDC) and batch data, supporting over a hundred data sources (https://seatunnel.apache.org/docs/2.3.1/Connector-v2-release-state/). It is already in use by hundreds of companies, including Bilibili, Tencent Cloud, and ByteDance.
You can choose to run it on the SeaTunnel Zeta engine, or on the Apache Flink or Spark engines.
Installation
Download version 2.3.1 (https://seatunnel.apache.org/download/), then extract it using the command tar -xzvf apache-seatunnel-*.tar.gz
.
Because version 2.3.2 has a bug where MySQL-CDC can’t find the driver (https://github.com/apache/seatunnel/pull/4945/files). This causes the following error: java.sql.SQLException: No suitable driver
Caused by: java.sql.SQLException: No suitable driver
at java.sql/java.sql.DriverManager.getDriver(DriverManager.java:298)
at com.zaxxer.hikari.util.DriverDataSource.<init>(DriverDataSource.java:106)
... 20 more
... 11 more
at org.apache.seatunnel.engine.client.job.ClientJobProxy.waitForJobComplete(ClientJobProxy.java:122)
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:181)
Installing connectors plugin
Execute bash bin/install-plugin.sh
. It's recommended to configure the Maven mirror in China first, otherwise, it may fail or be slow.
The official documentation mentions executing sh bin/install-plugin.sh
. I encountered an error when executing this on Ubuntu 20.04.2 LTS (bin/install-plugin.sh: 54: Bad substitution
), and I have submitted a PR for this issue (https://github.com/apache/seatunnel-website/pull/253).
Writing the Configuration File
Create a new configuration file in the config
directory, such as mysql-es-test.conf
. Add the env
configuration. Because we are synchronizing in real-time, job.mode
is set to "STREAMING"
, and execution.parallelism
is the number of concurrent threads.
env {
# You can set flink configuration here
execution.parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 2000
#execution.checkpoint.interval = 10000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
For real-time synchronization of MySQL, binlog needs to be enabled (https://debezium.io/documentation/reference/1.6/connectors/mysql.html#setting-up-mysql)
Add the data source configuration (https://seatunnel.apache.org/docs/2.3.1/connector-v2/source/MySQL-CDC#options). The result_table_name
is a temporary table name, convenient for subsequent use. table-names
must be the database.table name, and base-url
must specify the database. The default startup.mode
is INITIAL, which first synchronizes historical data and then performs incremental synchronization.
source {
MySQL-CDC {
result_table_name = "t1"
server-id = 5656
username = "root"
password = "pwd"
table-names = ["db.t1"]
base-url = "jdbc:mysql://host:3306/db"
}
}
Add the transformation configuration. SQL is more flexible. For a list of functions, see (https://seatunnel.apache.org/docs/2.3.1/transform-v2/sql-functions)
transform {
Sql {
source_table_name = "t1"
query = "SELECT id, alias_name aliasName FROM t1 WHERE c1 = '1'"
}
}
Add the output configuration (https://seatunnel.apache.org/docs/2.3.1/connector-v2/sink/Elasticsearch#options). For real-time CDC synchronization to es, primary_keys
must be configured.
sink {
Elasticsearch {
hosts = ["host:9200"]
username = "elastic"
password = "pwd"
index = "index_t1"
# cdc required options
primary_keys = ["id"]
}
}
Final configuration screenshot
Starting the Task
This example uses local mode
(https://seatunnel.apache.org/docs/2.3.1/seatunnel-engine/local-mode). Other options include cluster mode
(https://seatunnel.apache.org/docs/2.3.1/seatunnel-engine/deployment), spark, and flink modes.
./bin/seatunnel.sh -e local --config ./config/mysql-es-test.conf
Conclusion
The open-source data integration platform SeaTunnel can easily perform real-time synchronization from MySQL to es and other sources, it’s free and makes it easy to add synchronized fields. For more powerful features, please see the official documentation (https://seatunnel.apache.org/docs/2.3.1/about).
The new version comes with a built-in synchronization engine (https://seatunnel.apache.org/docs/2.3.1/seatunnel-engine/about), eliminating the need to rely on spark, flink, etc., reducing the deployment complexity for small data volume synchronization scenarios.
Beginning with the new version, a UI interface is provided (https://github.com/apache/seatunnel-web). Currently, it heavily depends on the Apache DolphinScheduler scheduling platform.