Harnessing the Power of SeaTunnel: Live Sync from ES to MySQL Guide

Apache SeaTunnel
4 min readAug 1, 2023

--

Preamble

Recently, our project required a few tables to be synchronized in real-time from MySQL to another MySQL, and some to be synchronized to ElasticSearch. Currently, our company uses Aliyun’s Data Transmission Service (DTS) for synchronization in the production environment. Each synchronization task costs over 500 yuan per month, which is a bit pricey.

In other environments, for MySQL synchronization to ElasticSearch, we use CloudCanal. It doesn’t support data conversion, and adding synchronized fields is a bit cumbersome. The community edition limits us to five tasks, which is insufficient. For MySQL synchronization to MySQL, we use debezium, which does not support writing to ES.

Coincidentally, three years ago, I used the predecessor of SeaTunnel, WaterDrop. So let’s get started with SeaTunnel. This article uses version 2.3.1 and the Ubuntu system as an example.

Open Source Data Integration Platform SeaTunnel

Introduction

SeaTunnel is a high-performance open-source big data integration tool under the Apache Software Foundation, providing a flexible, easy-to-use, easy-to-expand, and capable of handling hundreds of billions of data integration solutions.

SeaTunnel provides high-performance data synchronization capabilities for real-time (CDC) and batch data, supporting over a hundred data sources (https://seatunnel.apache.org/docs/2.3.1/Connector-v2-release-state/). It is already in use by hundreds of companies, including Bilibili, Tencent Cloud, and ByteDance.

You can choose to run it on the SeaTunnel Zeta engine, or on the Apache Flink or Spark engines.

Installation

Download version 2.3.1 (https://seatunnel.apache.org/download/), then extract it using the command tar -xzvf apache-seatunnel-*.tar.gz.

Because version 2.3.2 has a bug where MySQL-CDC can’t find the driver (https://github.com/apache/seatunnel/pull/4945/files). This causes the following error: java.sql.SQLException: No suitable driver

Caused by: java.sql.SQLException: No suitable driver
at java.sql/java.sql.DriverManager.getDriver(DriverManager.java:298)
at com.zaxxer.hikari.util.DriverDataSource.<init>(DriverDataSource.java:106)
... 20 more

... 11 more

at org.apache.seatunnel.engine.client.job.ClientJobProxy.waitForJobComplete(ClientJobProxy.java:122)
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:181)

Installing connectors plugin

Execute bash bin/install-plugin.sh. It's recommended to configure the Maven mirror in China first, otherwise, it may fail or be slow.

The official documentation mentions executing sh bin/install-plugin.sh. I encountered an error when executing this on Ubuntu 20.04.2 LTS (bin/install-plugin.sh: 54: Bad substitution), and I have submitted a PR for this issue (https://github.com/apache/seatunnel-website/pull/253).

Writing the Configuration File

Create a new configuration file in the config directory, such as mysql-es-test.conf. Add the env configuration. Because we are synchronizing in real-time, job.mode is set to "STREAMING", and execution.parallelism is the number of concurrent threads.

env {
# You can set flink configuration here
execution.parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 2000
#execution.checkpoint.interval = 10000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}

For real-time synchronization of MySQL, binlog needs to be enabled (https://debezium.io/documentation/reference/1.6/connectors/mysql.html#setting-up-mysql)

Add the data source configuration (https://seatunnel.apache.org/docs/2.3.1/connector-v2/source/MySQL-CDC#options). The result_table_name is a temporary table name, convenient for subsequent use. table-names must be the database.table name, and base-url must specify the database. The default startup.mode is INITIAL, which first synchronizes historical data and then performs incremental synchronization.

source {
MySQL-CDC {
result_table_name = "t1"
server-id = 5656
username = "root"
password = "pwd"
table-names = ["db.t1"]
base-url = "jdbc:mysql://host:3306/db"
}
}

Add the transformation configuration. SQL is more flexible. For a list of functions, see (https://seatunnel.apache.org/docs/2.3.1/transform-v2/sql-functions)

transform {
Sql {
source_table_name = "t1"
query = "SELECT id, alias_name aliasName FROM t1 WHERE c1 = '1'"
}
}

Add the output configuration (https://seatunnel.apache.org/docs/2.3.1/connector-v2/sink/Elasticsearch#options). For real-time CDC synchronization to es, primary_keys must be configured.

sink {
Elasticsearch {
hosts = ["host:9200"]
username = "elastic"
password = "pwd"
index = "index_t1"
# cdc required options
primary_keys = ["id"]
}
}

Final configuration screenshot

Starting the Task

This example uses local mode

(https://seatunnel.apache.org/docs/2.3.1/seatunnel-engine/local-mode). Other options include cluster mode

(https://seatunnel.apache.org/docs/2.3.1/seatunnel-engine/deployment), spark, and flink modes.

./bin/seatunnel.sh -e local --config ./config/mysql-es-test.conf

Conclusion

The open-source data integration platform SeaTunnel can easily perform real-time synchronization from MySQL to es and other sources, it’s free and makes it easy to add synchronized fields. For more powerful features, please see the official documentation (https://seatunnel.apache.org/docs/2.3.1/about).

The new version comes with a built-in synchronization engine (https://seatunnel.apache.org/docs/2.3.1/seatunnel-engine/about), eliminating the need to rely on spark, flink, etc., reducing the deployment complexity for small data volume synchronization scenarios.

Beginning with the new version, a UI interface is provided (https://github.com/apache/seatunnel-web). Currently, it heavily depends on the Apache DolphinScheduler scheduling platform.

--

--

Apache SeaTunnel
Apache SeaTunnel

Written by Apache SeaTunnel

The next-generation high-performance, distributed, massive data integration tool.

No responses yet