What! Start this data integration platform with just one line of command?
My name is Fan Jia, and I am currently a PMC member of the SeaTunnel community. My primary responsibilities include the design and development of the Connector V2 and some design and development work on Zeta engine. I am delighted to share Apache SeaTunnel with you all today.
Today’s sharing will be divided into five sections:
1. Introducing SeaTunnel
2. How to use SeaTunnel
3. Core Features
4. Process Analysis
5. Next Plans and Vision
01 SeaTunnel Introduction
Apache SeaTunnel is a project created and open-sourced in 2017. It was initially named Waterdrop, and it already has many users.
At the time, it was based on some configuration optimizations for Spark and Flink. For example, when we needed to synchronize data from Kafka to HDFS or data lakes, we had to write corresponding code. Waterdrop abandoned the process of having users write code, automatically generating tasks through configuration files, and submitting them to Flink and Spark, making it more convenient for users.
By 2021, Waterdrop had successfully donated to the Apache Foundation and joined the Apache Incubator, subsequently being renamed Apache SeaTunnel. It has now been around for nearly two years and is currently being voted on to become a top-level project.
The goal of Apache SeaTunnel is to create an easy-to-use, high-performance data integration platform that supports real-time streaming and offline batch processing.
02 How to use SeaTunnel
It only takes one command line to use SeaTunnel, as users need two points: startup and configuration, with configuration to be completed before startup. Here’s an example of a startup command:
./bin/seatunnel.sh -c config/v2.batch.config.template -m local
It’s simple! Just specify a shell file, a configuration file location, and a startup mode, and the synchronization or data processing tasks will be launched and run. The subsequent processing does not require continuous attention, as tasks will be executed automatically.
We need a Config, which is our configuration. This configuration is used to tell the program where to read data from and then transform and output or write it. The entire logic is processed through Config.
For example, Config has a corresponding Source, such as JDBC, writing from MySQL to Hive, and adding different Transforms in between, including filtering or renaming data.
Of course, Transform may not have the powerful join capabilities of Flink or Spark, as join may be used more for data analysis or computation. However, SeaTunnel’s positioning is as a data integration and synchronization platform, so Transform may focus more on data conversion and support some SQL-like features.
By defining a configuration file and executing a command, we can complete a data synchronization task.
What we want to achieve is a simple and efficient data integration tool, without the need for extensive coding work like before. With SeaTunnel, we can fully implement data synchronization functionality by declaring a configuration file.
03 SeaTunnel Core Features
No code means no coding is required. While coding has its advantages, it also has disadvantages, as it cannot efficiently and quickly produce a data synchronization task.
Distributed, other data integration and synchronization tools may also process data tasks through Config, such as DataX, which completes data task processing and declaration through Config. However, SeaTunnel’s advantage is its support for distributed processing, relying on Flink or Spark for distributed implementation, running a single data task across multiple nodes. It also supports built-in engines, which can support distributed processing, allowing a task to run across multiple nodes to improve overall task efficiency.
As mentioned earlier, Config supports running on Spark, Flink, and Apache SeaTunnel engines. To achieve running on three different engines,
the program side may require a lot of processing, but for the user side, users basically do not need to make changes. For the user side, it’s just a Config, and all tasks need to do is switch the Shell submission command. For example, if you need to submit to our engine, you can use a Zeta engine submission command; if you need to submit to Flink, you can use the corresponding submission command, which will help you submit the defined data integration task to the appropriate cluster.
You don’t need to change the configuration or the existing architecture, and the Config doesn’t need to be modified either; you just need to change the Shell command. This feature allows a task to run on multiple engines, but of course, SeaTunnel’s features don’t stop there.
Seamless integration of streaming and batch processing is achieved by simply changing a parameter in the Config file, defining whether the task is a batch or streaming task. All other configurations remain the same, allowing for smooth transitions between the two modes.
Data consistency is essential, and we need to ensure a reliable implementation in the overall process.
Support for cluster fault tolerance is also provided. For example, if there are issues with the entire cluster or the cluster’s Master, we need to ensure that our tasks continue to run smoothly. Currently, this is being implemented on our engine.
Support for CDC will be given a detailed introduction by a community partner later.
High-performance data synchronization is achieved when the code is bug-free, ensuring that data writing and reading are not slow. Moreover, it should be distributed, meaning that multiple nodes read the data for high efficiency. Additionally, tasks should be divided into multiple subtasks and assigned to different workers, allowing each worker to perform their tasks, resulting in high-performance data synchronization.
These are some of our core features, which we haven’t listed exhaustively. If you’re interested, you can check out SeaTunnel’s official website or join the community for discussions.
04 SeaTunnel Process Analysis
Taking MySQL to StarRocks as an example, we define the Source as JDBC, which by default reads data from MySQL and writes it to StarRocks. The process analysis is primarily based on the Zeta engine, as Flink and Spark communities are independent open-source projects and cannot be extensively modified. As a result, many new features are incorporated into the Zeta engine.
1、This section involves writing Config files. Config files have multiple Sources, Transforms, and Sinks, not just one Source and one Sink as demonstrated. In fact, we support configuring multiple Sources, Transforms, and Sinks in a single configuration file. This way, the complexity of synchronization tasks and some business processing are handled in one configuration file, eliminating the need for multiple configuration files and tasks.
2、Choose the appropriate execution engine, such as Zeta, Flink, or Spark. After selecting the engine, the task submission process begins. We currently support Shell form, with HTTP in the planning stage. In the future, we will support SDK form, allowing users to declare jobs within Java SDKs, for example, by completing Config or querying a library to expand some information into the SDK, which then submits the task to our cluster. This approach provides diverse task submission methods, making integration with user systems more convenient.
Using the Zeta engine as an example, when a task is submitted to the client, a corresponding Config file is available. By submitting the Config through a Shell command, a client-like endpoint is launched, which automatically registers with the Master node. Unlike Flink and Spark, which require specifying a Master node, these Master nodes are discovered automatically through node registration. The user-defined Config is parsed into recognizable Actions, each consisting of a Source, Transform, and Sink.
If you define three Actions, corresponding security initialization will be performed. This initialization may vary depending on the connector, such as JDBC, Doris, StarRocks, etc., and will have its own initialization logic to ensure the required resources are ready.
Once initialized, Actions are loaded into the logical plan (which defines the entire task’s execution process). When the Client submits the logical plan to the Master, the Master optimizes and generates a corresponding physical execution plan. As this is a distributed system with multiple nodes, the physical execution plan needs to be broken down into multiple tasks, each sent to different Worker nodes. In this way, Workers can run corresponding subtasks, and the Master manages all subtasks.
The Master here is called the coordinator. The coordinator listens for or periodically receives task execution statuses, integrates them, and performs recovery if necessary. If a task fails, the Master handles the fault accordingly. During the task initialization phase, the coordinator distributes Task groups received from different Workers.
3、How are Workers and Masters registered? This is also done automatically. For example, if you have four nodes, you don’t need to specify Workers and Masters; the engine will do it for you.
What happens if the Master fails?
Many centralized platforms or scheduling systems for data computation face the issue of single-point failures. The Master needs to manage all tasks and be aware of their statuses, as it is responsible for unified processing. If a Worker fails, all tasks on it will fail as well, requiring data recovery which may not be acceptable to users. To address this, the Master’s job information should be periodically saved in vlogs, written to S3, HDFS, or local storage. If the Master fails, a Worker will automatically select a new Master, which will then recalculate the running data information, perform data recovery, and reschedule execution. This pertains to the data engine, but when a data integration tool truly runs, it requires not only the engine but also various connectors to work closely together.
4、For example, with JDBC, it supports concurrent data reading, but to achieve this, it needs to support data partitioning. This allows different Workers and Tasks to concurrently read the corresponding data. In the case of MySQL, it may define nine partitions with a parallelism of three, where each Task reads data from three partitions, distinguished by fields. When writing to StarRocks, the corresponding Stream Load API can be used to speed up data writing, thus completing data synchronization or integration more quickly.
5、As for task state recovery, it’s not enough to just have job information. Each job has its own specific information, such as how much data has been read and written, and how many uncommitted transactions remain. This information is necessary for proper task rollback, ensuring the integrity of the data. So, how can this be achieved?
A common solution in the industry is to periodically store the status of each task, just like Flink’s switching mechanism. In case of an exception, the corresponding status can be used for rollback or task recovery, ensuring a smooth recovery process. Combined with Sink’s two-phase commit, exactly-once processing can be achieved. The above explanation covers the entire operation process.
Vision and Outlook for SeaTunnel
1.Automatic table creation
SeaTunnel aims to support automatic table creation, which is a feature currently under development. This feature would allow users to synchronize an entire database with just one configuration file.
Pain point: When syncing data, having to create tables in the destination system beforehand can be a hassle. Automatic table creation addresses this issue by creating the necessary tables in the destination system based on the source system’s schema.
2. Enhanced monitoring metrics
While the Zeta engine currently provides monitoring metrics, improvements are needed to better integrate these metrics with existing enterprise monitoring systems.
3. Schema evolution
SeaTunnel is currently working on a feature to support schema evolution. This feature will allow the system to automatically detect and adapt to changes in the source and destination schemas, such as new tables, modified columns, or altered data types.
4.Handling dirty data
In production environments, it is not uncommon for some data to be corrupted or otherwise unsuitable for syncing between systems. SeaTunnel aims to filter out dirty data without stopping the synchronization process, allowing users to address the issue separately without impacting the overall data flow.
5.Flow control
SeaTunnel plans to implement flow control features, allowing users to regulate the speed of data synchronization and reduce the load on both source and destination systems.
6.Containerization support
SeaTunnel is working on supporting containerization and plans to provide support for Kubernetes and Docker, making it even more convenient for users to deploy and manage their data synchronization tasks.
This concludes the presentation on SeaTunnel. If you are interested in learning more or getting involved in the community, feel free to reach out and join the conversation!