Simplifying Data Flow By Multi-Table Synchronization with Apache SeaTunnel
In addition to data synchronization between single tables, Apache SeaTunnel also supports synchronization from a single table to multiple tables, from multiple tables to a single table, and from multiple tables to multiple tables. Below is a brief example of how to implement these functionalities.
Single Table to Single Table
For one source, one sink situation.
Synchronizing from MySQL to MySQL without any distinction in between.
env {
# You can set flink configuration here
execution.parallelism = 2
job.mode = "BATCH"
}
source{
Jdbc {
url = "jdbc:mysql://127.0.0.1:3306/test"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "user"
password = "password"
query = "select * from base_region"
}
}
transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/transform/sql
}
sink {
jdbc {
url = "jdbc:mysql://127.0.0.1:3306/dw"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "user"
password = "password"
query = "insert into base_region(id,region_name) values(?,?)"
}
}
Execute the task:
./bin/seatunnel.sh --config ./config/mysql2mysql_batch.confSingle Table to Multiple TablesSingle Table to Multiple Tables
Single Table to Multiple Tables
For one source, and multiple sinks.
Synchronizing from MySQL to MySQL, where data from a single user table is synchronized. Two SQL components are used in between to separate male and female users, and during the sink phase, they are inserted into different tables:
env {
execution.parallelism = 2
job.mode = "BATCH"
}
source {
Jdbc {
url = "jdbc:mysql://127.0.0.1:3306/test"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "user"
password = "password"
result_table_name="t_user"
query = "select * from t_user;"
}
}
transform {
Sql {
source_table_name = "t_user"
result_table_name = "t_user_nan"
query = "select id,name,birth,gender from t_user where gender ='男';"
}
Sql {
source_table_name = "t_user"
result_table_name = "t_user_nv"
query = "select id,name,birth,gender from t_user where gender ='女';"
}
}
sink {
jdbc {
url = "jdbc:mysql://127.0.0.1:3306/dw"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "user"
password = "password"
source_table_name = "t_user_nan"
query = "insert into t_user_nan(id,name,birth,gender) values(?,?,?,?)"
}
jdbc {
url = "jdbc:mysql://127.0.0.1:3306/dw"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "user"
password = "password"
source_table_name = "t_user_nv"
query = "insert into t_user_nv(id,name,birth,gender) values(?,?,?,?)"
}
}
./bin/seatunnel.sh --config ./config/mysql2mysql_1n.conf
Multiple Tables to Single Table
For multiple sources, and one sink.
Assuming there is a switch usage table and a router usage table, the target table is an OLAP table that combines this data.
Table structure as follows:
-- dw Source table 1
CREATE TABLE IF NOT EXISTS ads_device_switch_performance (
`event_time` timestamp COMMENT 'Business Time',
`device_id` VARCHAR(32) COMMENT 'Equipment id',
`device_type` VARCHAR(32) COMMENT 'Equipment Type',
`device_name` VARCHAR(128) COMMENT 'Equipment Name',
`cpu_usage` INT COMMENT 'CPU Usage Rate'
) ;
INSERT INTO `ads_device_switch_performance` VALUES ('2024-01-15 14:25:11', '2001', '2', 'Exchanger 1', 49);
INSERT INTO `ads_device_switch_performance` VALUES ('2024-01-17 22:25:40', '2002', '1', 'Exchanger 2', 65);
-- dw Source table 2
CREATE TABLE IF NOT EXISTS ads_device_router_performance (
`event_time` timestamp COMMENT 'Business Time',
`device_id` VARCHAR(32) COMMENT 'Equipment id',
`device_type` VARCHAR(32) COMMENT 'Equipment Type',
`device_name` VARCHAR(128) COMMENT 'Equipment Name',
`cpu_usage` INT COMMENT 'CPU Usage Rate'
);
INSERT INTO `ads_device_router_performance` VALUES ('2024-01-17 21:23:22', '1001', '1', 'Router 1', 35);
INSERT INTO `ads_device_router_performance` VALUES ('2024-01-16 17:23:53', '1002', '2', 'Router 2', 46);
-------------------------------------------------------------------------------
-- olap Target table
CREATE TABLE `device_performance` (
`id` INT NOT NULL AUTO_INCREMENT COMMENT 'Table Primary Key',
`event_time` VARCHAR(32) NOT NULL COMMENT 'Business Time',
`device_id` VARCHAR(32) COMMENT 'Equipment id',
`device_type` VARCHAR(32) COMMENT 'Equipment Type',
`device_name` VARCHAR(128) NOT NULL COMMENT 'Equipment Name',
`cpu_usage` FLOAT NOT NULL COMMENT 'CPU Usage Rate Unit %',
`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
`update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Update time',
PRIMARY KEY (`id`)
) COMMENT='Equipment status';
Synchronize the switch data and router data together to the OLAP target table, and summarize the summary through the SQL component.
env {
job.mode="BATCH"
job.name="device_performance"
}
source {
Jdbc {
url="jdbc:mysql://127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"
driver="com.mysql.cj.jdbc.Driver"
user = "user"
password = "password"
result_table_name="switch_src"
query="SELECT `event_time`, `device_id`, `device_type`, `device_name`, `cpu_usage` FROM ads_device_switch_performance;"
}
Jdbc {
url="jdbc:mysql://127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"
driver="com.mysql.cj.jdbc.Driver"
user = "user"
password = "password"
result_table_name="router_src"
query="SELECT `event_time`, `device_id`, `device_type`, `device_name`, `cpu_usage` FROM ads_device_router_performance;"
}
}
transform {
Sql {
source_table_name = "switch_src"
result_table_name = "switch_dst"
query = "SELECT event_time , device_id, device_type, device_name, cpu_usage, NOW() AS create_time, NOW() AS update_time FROM switch_src;"
}
Sql {
source_table_name = "router_src"
result_table_name = "router_dst"
query = "SELECT event_time, device_id, device_type, device_name, cpu_usage, NOW() AS create_time, NOW() AS update_time FROM router_src;"
}
}
sink {
Jdbc {
url="jdbc:mysql://127.0.0.1:3306/olap?allowMultiQueries=true&characterEncoding=utf-8"
driver="com.mysql.cj.jdbc.Driver"
user = "user"
password = "password"
source_table_name = "switch_dst"
query="INSERT INTO device_performance VALUES(null,?, ?, ?, ?, ?, ?, ?) ;"
}
Jdbc {
url="jdbc:mysql://127.0.0.1:3306/olap?allowMultiQueries=true&characterEncoding=utf-8"
driver="com.mysql.cj.jdbc.Driver"
user = "user"
password = "password"
source_table_name = "router_dst"
query="INSERT INTO device_performance VALUES(null,?, ?, ?, ?, ?, ?, ?) ;"
}
}
Execute the task:
./bin/seatunnel.sh --config ./syn_job/mysql2mysql_n1_batch.conf
It works!
Multi-table to Multi-table
For multiple sources, and multiple sinks.
Synchronize switch usage data and router usage data to the corresponding destination table, and process them with intermediate SQL components
env {
job.mode="BATCH"
job.name="device_performance"
}
source {
Jdbc {
url="jdbc:mysql://127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"
driver="com.mysql.cj.jdbc.Driver"
user = "user"
password = "password"
result_table_name="switch_src"
query="SELECT `event_time`, `device_id`, `device_type`, `device_name`, `cpu_usage` FROM ads_device_switch_performance;"
}
Jdbc {
url="jdbc:mysql://127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"
driver="com.mysql.cj.jdbc.Driver"
user = "user"
password = "password"
result_table_name="router_src"
query="SELECT `event_time`, `device_id`, `device_type`, `device_name`, `cpu_usage` FROM ads_device_router_performance;"
}
}
transform {
Sql {
source_table_name = "switch_src"
result_table_name = "switch_dst"
query = "SELECT event_time , device_id, device_type, device_name, cpu_usage, NOW() AS create_time, NOW() AS update_time FROM switch_src;"
}
Sql {
source_table_name = "router_src"
result_table_name = "router_dst"
query = "SELECT event_time, device_id, device_type, device_name, cpu_usage, NOW() AS create_time, NOW() AS update_time FROM router_src;"
}
}
sink {
Jdbc {
url="jdbc:mysql://127.0.0.1:3306/olap?allowMultiQueries=true&characterEncoding=utf-8"
driver="com.mysql.cj.jdbc.Driver"
user = "user"
password = "password"
source_table_name = "switch_dst"
query="INSERT INTO device_performance_switch VALUES(null,?, ?, ?, ?, ?, ?, ?) ;"
}
Jdbc {
url="jdbc:mysql://127.0.0.1:3306/olap?allowMultiQueries=true&characterEncoding=utf-8"
driver="com.mysql.cj.jdbc.Driver"
user = "user"
password = "password"
source_table_name = "router_dst"
query="INSERT INTO device_performance_router VALUES(null,?, ?, ?, ?, ?, ?, ?) ;"
}
}
Conclusion
In summary, Apache SeaTunnel multi-table synchronization technology is efficient, real-time, reliable, and flexible, and plays an important role in the field of enterprise data synchronization. With the help of Apache SeaTunnel’s multi-table synchronization function, enterprises can better realize the seamless flow of data between different systems and databases, improve the efficiency of data management and utilization, and provide strong support for business development. We hope that this article can help readers better understand and apply Apache SeaTunnel multi-table synchronization, so as to bring more possibilities for enterprise data synchronization.
About Apache SeaTunnel
Apache SeaTunnel is an easy-to-use, ultra-high-performance distributed data integration platform that supports real-time synchronization of massive amounts of data and can synchronize hundreds of billions of data per day stably and efficiently.
.
Welcome to fill out this form to be a speaker of Apache SeaTunnel: https://forms.gle/vtpQS6ZuxqXMt6DT6 :)
Why do we need Apache SeaTunnel?
Apache SeaTunnel does everything it can to solve the problems you may encounter in synchronizing massive amounts of data.
- Data loss and duplication
- Task buildup and latency
- Low throughput
- Long application-to-production cycle time
- Lack of application status monitoring
Apache SeaTunnel Usage Scenarios
- Massive data synchronization
- Massive data integration
- ETL of large volumes of data
- Massive data aggregation
- Multi-source data processing
Features of Apache SeaTunnel
- Rich components
- High scalability
- Easy to use
- Mature and stable
How to get started with Apache SeaTunnel quickly?
Want to experience Apache SeaTunnel quickly? SeaTunnel 2.1.0 takes 10 seconds to get you up and running.
https://seatunnel.apache.org/docs/2.1.0/developement/setup
How can I contribute?
We invite all partners who are interested in making local open-source global to join the Apache SeaTunnel contributors family and foster open-source together!
Submit an issue:
https://github.com/apache/seatunnel/issues
Contribute code to:
https://github.com/apache/seatunnel/pulls
Subscribe to the community development mailing list :
dev-subscribe@seatunnel.apache.org
Development Mailing List :
dev@seatunnel.apache.org
Join Slack:
https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1kcxzyrxz-lKcF3BAyzHEmpcc4OSaCjQ
Follow Twitter:
https://twitter.com/ASFSeaTunnel
Join us now!❤️❤️