Data Migration from MySQL to OceanBase Using Apache SeaTunnel

Introduction

7 min readMar 18, 2025

In this guide, we will walk you through a lightweight data migration and synchronization solution from MySQL to OceanBase using Apache SeaTunnel (SeaTunnel for short). We will leverage its built-in Zeta engine, which supports full data synchronization, offline incremental synchronization, and Change Data Capture (CDC) solutions.

Preparing the Runtime Environment

Before we begin, ensure that your environment is ready.

Install Java

SeaTunnel requires Java 8 or higher. While Java 8 is recommended, later versions should work as well.

After installation, verify that Java is correctly configured by running:

root:~# java -version
openjdk version "17.0.12" 2024-07-16
OpenJDK Runtime Environment (build 17.0.12+7-Debian-2deb11u1)
OpenJDK 64-Bit Server VM (build 17.0.12+7-Debian-2deb11u1, mixed mode, sharing)

Make sure that JAVA_HOME is properly set.

Download and Install Apache SeaTunnel

Visit the official SeaTunnel website to download the latest version.

For this guide, we will use version 2.3.9:

# Download
wget https://dlcdn.apache.org/seatunnel/2.3.9/apache-seatunnel-2.3.9-bin.tar.gz
# Extract
tar -zxvf apache-seatunnel-2.3.9-bin.tar.gz

Installing Connector Plugins

SeaTunnel’s installation package only contains the core framework and the Zeta engine. To connect with various data sources, you need to manually download and configure the required plugins.

Automatic Plugin Installation

To automatically download the necessary connectors, modify the config/plugin_config file and specify the required connectors. By default, the file includes all connectors, but for this guide, we only include the essential ones:

connector-cdc-mysql
connector-jdbc
connector-fake
connector-console

Run the following command to install the plugins:

sh bin/install-plugin.sh 2.3.9

Manual Plugin Installation

Alternatively, you can manually download the required plugins from Apache Maven Repository.

Download the necessary .jar files, for example:

connector-cdc-mysql-2.3.9.jar
connector-console-2.3.9.jar
connector-fake-2.3.9.jar
connector-jdbc-2.3.9.jar
seatunnel-transforms-v2-2.3.9.jar

After downloading, move the files into the Connectors directory.

Verifying Connector Installation

To check if the connectors are installed correctly, run:

./bin/seatunnel-connector.sh -l

Source
FakeSource MySQL-CDC Jdbc


Sink
Jdbc Console


Transform
Copy DynamicCompile FieldMapper Filter FilterRowKind JsonPath LLM Replace Split Sql

Since we will be using JDBC for MySQL connection to interact with OceanBase, you also need to download the MySQL JDBC driver from the official MySQL website.

Once downloaded, place the mysql-connector-j-9.0.0.jar file into {seatunnel/lib}

Verifying SeaTunnel Installation

To confirm that SeaTunnel is installed correctly, execute a batch processing test using the default configuration template:

./bin/seatunnel.sh --config ./config/v2.batch.config.template -m local

Command Explanation

  • seatunnel.sh → Standard SeaTunnel startup script
  • config → Specifies the configuration script
  • -m local → Runs in local mode

If everything is working correctly, you should see output similar to this:

2022-12-19 11:01:45,417 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - output rowType: name<STRING>, age<INT>
2022-12-19 11:01:46,489 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=1: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: CpiOd, 8520946
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=2: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: eQqTs, 1256802974
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=3: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: UsRgO, 2053193072
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=4: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: jDQJj, 1993016602
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=5: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: rqdKp, 1392682764
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=6: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: wCoWN, 986999925
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=7: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: qomTU, 72775247
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=8: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: jcqXR, 1074529204
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=9: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: AkWIO, 1961723427
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=10: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: hBoib, 929089763
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=11: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: GSvzm, 827085798
2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=12: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: NNAYI, 94307133
2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=13: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: EexFl, 1823689599
2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=14: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: CBXUb, 869582787
2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=15: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: Wbxtm, 1469371353
2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=16: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: mIJDt, 995616438

At the end of the job execution, you will see a summary log:

***********************************************
Job Statistic Information
***********************************************
Start Time : 2024-08-29 22:45:29
End Time : 2024-08-29 22:45:33
Total Time(s) : 4
Total Read Count : 32
Total Write Count : 32
Total Failed Count : 0
***********************************************

This confirms that SeaTunnel is working correctly.

Full Data Synchronization

Creating the Test Table

To verify full data synchronization, we create test tables in both MySQL and OceanBase.

Step 1: Creating the MySQL Table

CREATE TABLE `table1` (
`id` INT NOT NULL AUTO_INCREMENT,
`value1` VARCHAR(255) NOT NULL,
`value2` VARCHAR(255) ,
`value3` VARCHAR(255) ,
`value4` VARCHAR(255) ,
`value5` VARCHAR(255) ,
`created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
`updated_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE INDEX `idx_value1` (`value1`),
INDEX `idx_value2_value3` (`value2`, `value3`),
INDEX `idx_value3_value4_value5` (`value3`, `value4`, `value5`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

CREATE TABLE `table2` (
`id` INT NOT NULL AUTO_INCREMENT,
`value1` VARCHAR(255) NOT NULL,
`value2` VARCHAR(255) ,
`value3` VARCHAR(255) ,
`value4` VARCHAR(255) ,
`value5` VARCHAR(255) ,
`created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
`updated_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE INDEX `idx_value1` (`value1`),
INDEX `idx_value2_value3` (`value2`, `value3`),
INDEX `idx_value3_value4_value5` (`value3`, `value4`, `value5`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

We used Navicat to create 100,000 records each.

Configuring SeaTunnel for Full Synchronization

Full Data Synchronization Configuration File

Note: We recommend manually migrating the table schema since automatic migration may encounter issues and does not create indexes.

Single-Table Full Sync

env {
parallelism = 5
job.mode = "BATCH"
}
source {
Jdbc {
url = "jdbc:mysql://127.0.0.1:3306/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "xxx"
password = "xxx"
query = "select * from seatunnel.table1"
}
}
sink {
jdbc {
url = "jdbc:mysql://127.0.0.1:2883/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
user = "xxx@xxx"
password = "xxx"
# Automatically generate SQL statements
generate_sink_sql = true
database = seatunnel
table = table1
}
}

Result:

***********************************************
Job Statistic Information
***********************************************
Start Time : 2024-08-30 15:05:39
End Time : 2024-08-30 15:05:47
Total Time(s) : 8
Total Read Count : 100000
Total Write Count : 100000
Total Failed Count : 0
***********************************************

Multi-Table Full Extraction

env {
parallelism = 5
job.mode = "BATCH"
}
source {
Jdbc {
url = "jdbc:mysql://127.0.0.1:3306/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "xxx"
password = "xxx"
table_list = [
{
table_path = "seatunnel.table1"
},
{
table_path = "seatunnel.table2"
query = "select * from seatunnel.table2 where id > 100"
}
]
}
}
sink {
jdbc {
url = "jdbc:mysql://127.0.0.1:2883/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
user = "xxx@xxx"
password = "xxx"
# Automatically generate SQL statements
generate_sink_sql = true
database = seatunnel
table_list = ["seatunnel.table1", "seatunnel.table2"]
}
}

Result:

***********************************************
Job Statistic Information
***********************************************
Start Time : 2024-08-30 15:10:09
End Time : 2024-08-30 15:10:20
Total Time(s) : 10
Total Read Count : 200000
Total Write Count : 200000
Total Failed Count : 0
***********************************************

Incremental Synchronization Configuration File

For incremental sync, a simple approach is to use a query that filters based on an id or updatetime column.

env {
parallelism = 1
job.mode = "BATCH"
}
source {
Jdbc {
url = "jdbc:mysql://127.0.0.1:3306/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "xxx"
password = "xxx"
query = "SELECT * FROM seatunnel.table1 WHERE updatetime > '2024-01-01' "
}
}
sink {
jdbc {
url = "jdbc:mysql://127.0.0.1:2883/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
user = "xxx@xxx"
password = "xxx"
generate_sink_sql = true
database = seatunnel
table = table1
}
}

Note: The sink will perform insert and update operations based on the primary key. However, manually updating the configuration file for each incremental run can be cumbersome. We recommend using Apache DolphinScheduler in conjunction with SeaTunnel to create a workflow. With DolphinScheduler, you can obtain the maximum timestamp or id from the sink and pass it as a workflow variable.

Example configuration with a variable:

env {
parallelism = 1
job.mode = "BATCH"
}
source {
Jdbc {
url = "jdbc:mysql://127.0.0.1:3306/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "xxx"
password = "xxx"
query = "SELECT * FROM seatunnel.table1 WHERE updatetime > ${max_id} "
}
}
sink {
jdbc {
url = "jdbc:mysql://127.0.0.1:2883/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
user = "xxx@xxx"
password = "xxx"
generate_sink_sql = true
database = seatunnel
table = table1
}
}

The multi-table configuration is similar.

CDC Synchronization Configuration File

Manual Table Schema Migration

Due to issues with SeaTunnel’s OceanBase component, schema migration can be error-prone. It is recommended to migrate the table schema manually.

Check MySQL Binlog Status

Grant the necessary privileges to the user:

mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
mysql> FLUSH PRIVILEGES;

Verify that the binlog is enabled:

mysql> show variables where variable_name in ('log_bin', 'binlog_format', 'binlog_row_image', 'gtid_mode', 'enforce_gtid_consistency');
+--------------------------+----------------+
| Variable_name | Value |
+--------------------------+----------------+
| binlog_format | ROW |
| binlog_row_image | FULL |
| enforce_gtid_consistency | ON |
| gtid_mode | ON |
| log_bin | ON |
+--------------------------+----------------+

If the settings are not as above, please adjust your mysql.cnf file accordingly. Note that when creating a consistent snapshot on large databases, read timeouts may occur; please configure interactive_timeout and wait_timeout as needed.

After preparing the environment, write the configuration file.

env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 10000
}

source {
MySQL-CDC {
base-url = "jdbc:mysql://127.0.0.1:3306/mysql"
username = "xxx"
password = "xxx@xxx"
table-names = ["seatunnel.table1", "seatunnel.table2"]
startup.mode = "initial"
}
}

sink {
jdbc {
url = "jdbc:mysql://127.0.0.1:2883/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
user = "xxx@xxx"
password = "xxx"
database = "seatunnel" # Target database
table-names = ["seatunnel.table1", "seatunnel.table2"]
generate_sink_sql = true # Automatically generate SQL
}
}

Once started, the job will first perform a historical data migration, then process CDC changes.

Important Note:

Upon startup, SeaTunnel will execute different operations based on the configured tables and the startup.mode setting. The startup.mode options are as follows:

  • initial: Synchronizes historical data first, then incremental data.
  • earliest: Starts from the earliest offset.
  • latest: Starts from the latest offset.
  • specific: Starts from a user-provided specific offset.

If you use specific, you must provide the offset file (e.g., startup.specific-offset.file binlog) and the offset position (e.g., startup.specific-offset.pos binlog).

Conclusion

This article has detailed how to configure full, incremental, and CDC synchronization using Apache SeaTunnel. We covered:

  • Full sync configuration for single and multi-table extraction.
  • Incremental sync configuration using query filters (with an option to integrate with Apache DolphinScheduler).
  • CDC sync configuration, including prerequisites like binlog verification.

By following these steps, you can achieve a complete, end-to-end data migration and synchronization solution. Thank you for reading, and please provide your feedback!🚀

--

--

Apache SeaTunnel
Apache SeaTunnel

Written by Apache SeaTunnel

The next-generation high-performance, distributed, massive data integration tool.

No responses yet