Apache SeaTunnel: Navigating the Future of Vast Data Integration Built on Spark and Flink
1. Getting to Know Apache SeaTunnel
Apache SeaTunnel is an incredibly user-friendly, high-performance platform for massive data integration. It operates both in real-time streaming and offline batch processing modes. Built on Apache Spark and Apache Flink, it supports real-time synchronization and transformation of vast datasets.
SeaTunnel focuses on data integration and synchronization, addressing common challenges in the data integration domain:
- Diverse Data Sources: With hundreds of commonly used data sources and version incompatibilities, it’s hard for users to find tools that comprehensively support these sources.
- Complex Sync Scenarios: Data synchronization needs to support offline full sync, offline incremental sync, CDC, real-time sync, and full database sync.
- High Resource Demands: Many existing tools require significant computational or JDBC connection resources for real-time sync of massive tables.
- Lack of Quality & Monitoring: Data loss or duplication often occurs during data integration and synchronization. Plus, there’s a general lack of monitoring.
- Complex Tech Stack: Companies use various tech components, so users need to develop syncing programs tailored to each.
- Challenging Maintenance: Due to differing underlying technologies like Flink or Spark, offline and real-time syncing often have to be managed separately.
2. System Architecture, Workflow, and Features
Apache SeaTunnel System Architecture: Input/Source [Data Source Input] -> Filter/Transform [Data Processing] -> Output/Sink [Result Output]
The diagram above depicts the entire workflow of Apache SeaTunnel. The data processing pipeline consists of multiple filters to meet various data processing requirements. If you’re familiar with SQL, you can use it to construct the data processing pipeline, making the process more efficient. Currently, the list of filters supported by Apache SeaTunnel is expanding.
Features of Apache SeaTunnel:
- Rich and Extensible Connectors: Apache SeaTunnel offers a Connector API independent of any specific execution engine. Connectors developed using this API can run on various engines, such as the SeaTunnel Engine, Flink, and Spark.
- Connector Plugins: Its plugin design enables users to easily develop their connectors and integrate them into the Apache SeaTunnel project. Over 100 connectors are supported, with the number rapidly increasing.
- Batch-Stream Integration: The connectors developed using the Apache SeaTunnel Connector API are compatible with offline syncing, real-time syncing, full syncing, and incremental syncing.
- Support for Multiple Engines: While Apache SeaTunnel primarily uses the SeaTunnel Engine for data synchronization, it also supports Flink or Spark as the execution engines for its connectors.
- JDBC Multiplexing: Apache SeaTunnel supports syncing for multiple tables or entire databases, solving the issue of excessive JDBC connections.
- High Throughput with Low Latency: Apache SeaTunnel offers parallel reading and writing, delivering stable, reliable, high-throughput, and low-latency data synchronization.
- Comprehensive Real-time Monitoring: Apache SeaTunnel provides detailed monitoring for every step in the data synchronization process.
- Two Job Development Modes: Code-based and Canvas Design, offering visual management, scheduling, running, and monitoring capabilities.
3. SeaTunnel Work Architecture
As illustrated above, this is the operation process of Apache SeaTunnel.
Users configure job details and select an execution engine to submit jobs. The Source Connector is responsible for parallel data reading and sends the data to the downstream Transform or directly to the Sink. The Sink then writes the data to the desired destination. Notably, both Sources and Transforms, as well as Sinks, can be easily developed and extended.
SeaTunnel is an EL(T) data integration platform. Thus, in SeaTunnel, Transform is solely for straightforward data transformations, like converting column data to uppercase or lowercase, renaming columns, or splitting one column into several.
The default engine used by SeaTunnel is the SeaTunnel Engine. If you opt for Flink or Spark engines, SeaTunnel will package the Connector as a Flink or Spark program and submit it for execution.
- Source Connectors: SeaTunnel supports reading data from various relational databases, graph databases, NoSQL databases, document databases, in-memory databases, different distributed file systems like HDFS, cloud storages like S3 and OSS, and also many standard SaaS service data.
- Transform Connectors: If the source and receiver architectures differ, you can use the transform connector to modify the schema read from the source to align with the receiver’s schema.
- Sink Connectors: SeaTunnel supports writing data to a plethora of relational databases, graph databases, NoSQL databases, document databases, in-memory databases, varied distributed file systems, cloud storages, and several common SaaS services.
4. Deploying SeaTunnel
- Install Java: Ensure you have Java 8 or higher.
- Download SeaTunnel:
export version="2.3.1"
wget "https://archive.apache.org/dist/incubator/seatunnel/{version}-bin.tar.gz"
tar -xzvf "apache-seatunnel-incubating-${version}-bin.tar.gz"
3. Install Connectors: Starting from version 2.2.0-beta, the binary package doesn’t provide connector dependencies by default. Thus, on first use, execute the following command to install the connector:
sh bin/install-plugin.sh 2.3.1
Or, you can manually download the connector from here and then move it to the connectors/seatunnel directory.
If you need a specific version of the connector, for instance, 2.3.0-beta, run:
sh bin/install-plugin.sh 2.3.1
Usually, you won’t need all connector plugins. You can specify the plugins you require in the config/plugin_config. For example, if you only need the connector-console plugin, modify the plugin_config to:
--connectors-v2-- connector-console --end--
If you want the sample application to work correctly, add the following plugins:
--connectors-v2-- connector-fake connector-console --end--
You can find all supported connectors and their respective plugin_config configuration names under ${SEATUNNEL_HOME}/connectors/plugins-mapping.properties
.
Note: If you wish to manually download and install connector plugins, consider the following:
The connectors directory includes the following subdirectory. If it’s absent, create it manually:
seatunnel
For manual installation of V2 connector plugins, simply download the required V2 connector plugin and place it in the seatunnel directory.
5.Quick Start Jobs
Add Job Configuration File to Define
Edit config/v2.batch.config.template
to determine the input, processing, and output methods and logic after SeaTunnel starts. Below is an example of a configuration file, which is the same as the example mentioned above.
env {
execution.parallelism = 1
job.mode = "BATCH"
}
source {
FakeSource {
result_table_name = "fake"
row.num = 16
schema = {
fields {
name = "string"
age = "int"
}
}
}
}
sink {
Console {}
}
2. Run SeaTunnel
You can start the application with the following command:
cd "apache-seatunnel-incubating-${version}"
./bin/seatunnel.sh --config ./config/v2.batch.config.template -e local
Check the output: When running the command, you can see its output in the console. You can consider this as a sign of the command running successfully or not.
The SeaTunnel console will print some logs as follows:
2022-12-19 11:01:45,417 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - output rowType: name, age2022-12-19 11:01:46,489 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=1: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: CpiOd, 8520946
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=2: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: eQqTs, 1256802974
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=3: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: UsRgO, 2053193072
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=4: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: jDQJj, 1993016602
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=5: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: rqdKp, 1392682764
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=6: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: wCoWN, 986999925
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=7: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: qomTU, 72775247
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=8: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: jcqXR, 1074529204
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=9: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: AkWIO, 1961723427
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=10: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: hBoib, 929089763
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=11: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: GSvzm, 827085798
2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=12: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: NNAYI, 94307133
2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=13: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: EexFl, 1823689599
2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=14: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: CBXUb, 869582787
2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=15: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: Wbxtm, 1469371353
2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=16: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: mIJDt, 995616438
6. SeaTunnel Integrates with Flink
Deploy and Configure Flink
Download Flink. The required version of Flink is >=1.12.0.
Configure SeaTunnel: Modify the settings in config/seatunnel-env.sh
. It is based on the path where your engine is installed during deployment. Change FLINK_HOME
to the Flink deployment directory.
- Add Job Configuration File to Define
Edit config/v2.streaming.conf.template
to determine the input, processing, and output methods and logic after SeaTunnel starts. Below is an example of a configuration file, which is the same as the example mentioned above.
env {
execution.parallelism = 1
job.mode = "BATCH"
}
source {
FakeSource {
result_table_name = "fake"
row.num = 16
schema = {
fields {
name = "string"
age = "int"
}
}
}
}
sink {
Console {}
}
Run SeaTunnel
For Flink 1.12.x and Flink 1.14.x:
cd "apache-seatunnel-incubating-${version}"
./bin/start-seatunnel-flink-13-connector-v2.sh --config ./config/v2.streaming.conf.template
For Flink 1.15.x and Flink 1.16.x:
cd "apache-seatunnel-incubating-${version}"
./bin/start-seatunnel-flink-15-connector-v2.sh --config ./config/v2.streaming.conf.template
Check the output: When running the command, you can see its output in the console. This can be an indicator if the command ran successfully or not.
The SeaTunnel console will print some logs as follows:
2022-12-19 11:01:45,417 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - output rowType: name, age2022-12-19 11:01:46,489 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=1: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: CpiOd, 8520946
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=2: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: eQqTs, 1256802974
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=3: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: UsRgO, 2053193072
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=4: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: jDQJj, 1993016602
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=5: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: rqdKp, 1392682764
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=6: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: wCoWN, 986999925
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=7: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: qomTU, 72775247
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=8: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: jcqXR, 1074529204
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=9: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: AkWIO, 1961723427
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=10: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: hBoib, 929089763
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=11: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: GSvzm, 827085798
2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=12: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: NNAYI, 94307133
2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=13: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: EexFl, 1823689599
2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=14: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: CBXUb, 869582787
2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=15: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: Wbxtm, 1469371353
2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=16: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: mIJDt, 995616438
7. Integrating SeaTunnel with Spark
Deployment and Configuration of Spark
Download Spark (version required >= 2.4.0) and configure SeaTunnel.
Modify the settings in config/seatunnel-env.sh
, which is based on the path where your engine was installed during deployment. Change the SPARK_HOME
to the Spark deployment directory.
Add a Job Configuration File
Edit config/seatunnel.streaming.conf.template
to determine the way and logic of data input, processing, and output after SeaTunnel starts. Here's an example of a configuration file, identical to the previously mentioned example.
env {
execution.parallelism = 1
job.mode = "BATCH"
}
source {
FakeSource {
result_table_name = "fake"
row.num = 16
schema = {
fields {
name = "string"
age = "int"
}
}
}
}
sink {
Console {}
}
Run SeaTunnel
Applications can be started with the following commands:
For Spark 2.4.x:
cd "apache-seatunnel-incubating-${version}"
./bin/start-seatunnel-spark-2-connector-v2.sh
--master local[4]
--deploy-mode client
--config ./config/seatunnel.streaming.conf.template
For Spark 3.xx:
cd "apache-seatunnel-incubating-${version}"
./bin/start-seatunnel-spark-3-connector-v2.sh
--master local[4]
--deploy-mode client
--config ./config/seatunnel.streaming.conf.template
View the output: When you run the command, you can see its output in the console. This can be seen as an indication of whether the command was successful or not. The SeaTunnel console will print some logs as follows:
fields : name, age
types : STRING, INT
row=1 : elWaB, 1984352560
row=2 : uAtnp, 762961563
row=3 : TQEIB, 2042675010
row=4 : DcFjo, 593971283
row=5 : SenEb, 2099913608
row=6 : DHjkg, 1928005856
row=7 : eScCM, 526029657
row=8 : sgOeE, 600878991
row=9 : gwdvw, 1951126920
row=10 : nSiKE, 488708928
row=11 : xubpl, 1420202810
row=12 : rHZqb, 331185742
row=13 : rciGD, 1112878259
row=14 : qLhdI, 1457046294
row=15 : ZTkRx, 1240668386
row=16 : SGZCr, 94186144
8.run command
Spark2:
bin/start-seatunnel-spark-2-connector-v2.sh --config config/v2.batch.config.template -m local -e client
Spark3:
bin/start-seatunnel-spark-3-connector-v2.sh --config config/v2.batch.config.template -m local -e client
Flink13和Flink14:
bin/start-seatunnel-flink-13-connector-v2.sh --config config/v2.batch.config.template
Flink15和Flink16:
bin/start-seatunnel-flink-15-connector-v2.sh --config config/v2.batch.config.templat