Visual task orchestration & Drag & Drop, Scaleph Data integration practice based on Apache SeaTunnel(Incubating)

Apache SeaTunnel
13 min readJul 7, 2022

--

Apache SeaTunnel meetup in June: topic sharing about Scaleph data integration practice based on Apache SeaTunnel (Incubating), and we hope you will harvest a lot out of this talk.

Summary of the talk:

● About Scaleph

● Introduction to Scaleph architecture and features

● Apache SeaTunnel Community Contribution

● Demonstration of the system

● Follow-up Development Plan

Qi Wang, an Apache SeaTunnel Contributor, Search and Recommendation Engineer, and Big Data Java Developer

01 Origin of Scaleph Tools

During my early work in the search and recommendation, I was in charge of maintaining the Dump system, which is mainly responsible for indexing data to the search engine.

We face five troublesome problems in maintaining the system.

Timeliness and stability

Search recommendation is the core online system of the e-commerce platforms, which proposes harsh requirements for data timeliness and stability in especial. Since search recommendation system bears most of the traffic from the Consumer of the entire e-commerce platform, once the service fluctuates, it may cause damage to the service, and it will bring a worse experience for the Consumer.

Business complexity/large wide table design

The Dump system will take a series of pre-process to the real-time/offline data and model data of products, categories, brands, stores, product tags, and Data Warehouse of the e-commerce platform, and finally output a large wide table. During this process, the complexity and variability of the upstream business will invade the Dump system, and cast superior technical challenges.

Full + real-time indexing

The full index is run once every day, which mainly aims to update the data with T+1 frequency. After the full index is finished, we will use the real-time index to refresh the data that needs to be updated in real-time, e.g. information about products price , inventory changes, etc.

Data linkage update

We have many sources of upstream data, including message queues, databases, big data-related storage, and Dubbo interfaces. As it is a large wide table design, for example, if it is a commodity index, the big wide table will be commodity-based, and if it is a store index, it will be store-based, because the upstream data changes are not always the commodity or store dimension, the data will also trigger certainly linked updates.

The mission to protect the data

The search recommendation service bears most of the Consumer traffic in the entire e-commerce platform. When the system performance of other teams in the company couldn’t keep up with it, they may transfer the data to the search engine through the Dump system, and then our team would return them to the Web page for them, to avoid a second request to them subsequently.

At the same time, if the other team’s business system generates dirty data, the Dump system should offer data protection to prevent the dirty data from leaking to users and causing bad influence, so it is also very difficult to develop and maintain the system.

02 Why introduce Flink?

As an early user of Flink in China, Alibaba has a long history and successful experience in the field of search recommendation. My professional experience in developing and maintaining the Dump system in the search recommendation team prompted me to start to focus on using Flink to do works other than A/B experimental reports, data flow in real-time, such as using Flink to implement the Dump system to provide data indexing for search engines.

There are 5 advantages of using Flink for data integration:

1. Native distributed support: Flink supports multiple deployment and operation methods, including standalone, yarn, Kubernetes;

2. Low latency, mass throughput capacity: widely used in many large internet enterprises, and the relevant features have been proved superior in production environments;

3. Ecological support: Flink provides many out-of-the-box connectors, supports CSV, Avro data formats, Kafka, pulsar, and other messaging systems, and many storage systems, and is closely integrated with the big data ecosystem

4. The system is based on distributed lightweight asynchronous snapshot mechanism to achieve exactly-once semantics, providing data consistency guarantee for task failure, restart, migration, upgrade, etc.

5. In addition to the metrics provided by Flink itself, the metrics framework allows users to develop custom metrics for tasks to enrich monitoring metrics.

03 Why Apache SeaTunnel?

I love the design concept of Apache SeaTunnel when I first encounter it! Apache SeaTunnel is the next-generation high-performance, distributed, massive data integration framework running on Flink and Spark.

It is important that Apache SeaTunnel is out-of-the-box and seamlessly integrates with the existing ecosystem, as it runs on Flink and Spark, and can be easily reused to the company’s existing Flink and Spark infrastructure. On the other hand, there are many successful use cases in the production environment with Apache SeaTunnel. Besides, after being incubated by Apache Software Foundation, the community is more active and the future is promising.

04 About Scaleph

The initial purpose of the project

We initially aim to provide a Web UI page for Apache SeaTunnel and to become a data integration open-source system. At present, our main goal is to make an open-source visual data development and management system for Apache SeaTunnel, and we expect Scaleph to minimize the development threshold of real-time and offline data tasks, and provide a one-stop data development platform for developers.

Project Features

In real production applications, when data integration is performed, visual task orchestration or SQL development is the main form of the data integration. We believe that Drag and Drop visual task scheduling can minimize the burden of data integration for users.

Another aim is to achieve multi-version management of jobs and data source support.

  • Flink cluster support for multi-version/multi-deployment environments.
  • Support for real-time/periodic tasks.
Core Architecture

Above is the architecture diagram of our system, users can drag and drop configuration on the page, using Apache SeaTunnel operators backend by the job management function by the Web UI, and the system automatically generates the Apache SeaTunnel configuration file, which finally submits it to the Flink cluster through the Flinkful library together with the resource jar package uploaded by users in the resource management. The mission of the resource management jar package is to support users to upload their own jar packages, complement Apache SeaTunnel-related defects, or reinforce the Apache SeaTunnel and Flink functions!

We have developed a scheduling task using quartz. When the task is submitted to Flink, the task will regularly pull the task information from the Flink cluster and store it in MySQL, so the end-user can see the task-related runtime information on the web UI page.

Scaleph Features Brief (Data Development)

Project Management

When users create data synchronization tasks, they can manage data synchronization tasks according to different business dimensions.

Job Management

Apache SeaTunnel synchronization tasks can be created by drag-and-drop operation, and then be submitted and run.

Resource Management

Apache SeaTunnel is open source with Apache 2.0 license, which is not compatible with MySQL’s JDBC driver license, Apache SeaTunnel’s jdbc connector does not provide relevant JDBC driver dependencies. When users use the jdbc connector, they need to provide their own JDBC driver package. We provide a resource management function here so that users can upload the driver package themselves, and then submit the Apache SeaTunnel tasks to the cluster with the MySQL driver to ensure the proper operation of the tasks.

Cluster Management

Support for Flink cluster info configing. At present, we can support Standalone Session mode. After users enter the information, they can select the corresponding cluster when submitting Apache SeaTunnel jobs, and the tasks can be run in the cluster.

Data source management

Support users to enter some data source information in advance to save users from entering the data source twice for each task. At the same time, data source sharing and permission restrictions are supported to prevent data source leakage.

Scaleph Features Brief (operation and maintenance center)

The operation and maintenance center is a running log of real-time tasks and periodic tasks, allowing users to see task-related information when they submit tasks. We provide a link jumping operation, users can jump to Flink’s Web UI by clicking on it, and view the specific execution information of tasks on Flink’s official Web UI page.

Scaleph Features Brief (data standards)

Data Element

Data governance is a big system. Although people are usually more concerned about metadata, data lineage, or data assets, the data standards are also an important part of data governance, we open-source our internal-used standard system to share the knowledge about data standards.

Since the co-work mode during many data warehouse development process, the same business with the same meaning may be defined differently in different model tables by different developers. The data standard is expected to unify the model field definitions of the data warehouse developers by data elements.

Reference Data

The data in the data warehouse is pulled from the business system through data integration tools, and it will inevitably appear that the fields with the same meaning have different definitions in different business systems, and these fields with the same meaning and different definitions need to be maintained by the data warehouse staff, which are generally carried on by offline documents, and the maintenance may occur to be out of date.

At the same time, the problem that business knowledge cannot be directly mapped to the data warehouse model information may occur, too, and the data standard allows users to maintain this business knowledge on the Web page.

The figure above shows a case. Here are two defined business systems A and B, which have different gender enumeration values, and the enumeration descriptions of both A/B systems are different, so how should we do?

For this case, we can set a set of unified standards through the data warehouse developers, such as unify the code as 0, 1, 2 with the corresponding description definition, so the user can easily go to view the information by a reference data mapping.

Follow-up ideas

In the data integration, automatic transformation operations can be performed directly through the data standard to achieve automatic maintenance and mapping of knowledge and models.

Scaleph features highlights

Visualization of data development. We believe that in the field of data synchronization, visual drag-and-drop can help users quickly create data integration tasks by dragging and dropping two operators and filling in the corresponding parameters.

Flinkful is a Java client we developed for Flink.

As a popular computing engine, Flink provides many ways for users to use it, such as command-line interface, HTTP interface, etc. Through the command-line interface, users can submit tasks, create tasks and cancel tasks; the HTTP interface is mainly used for the Web UI interface.

In the process of Flink system intergration, we found that Flink and Scaleph both running on JVM as an application, but the integration of the two has to be done through shell script, which is very unreasonable. So we developed Flinkful to open up the open capability of Flink in the Java ecosystem and allow users to manage Flink clusters and tasks directly through Flinkful.

We think Flinkful is more meaningful for Flink infrastructure maintainers, so it was stripped out from the Scaleph repository and open-sourced separately.

As for Plug-in system, we hope to define plug-ins to provide system extension interfaces through which users and Scaleph developers can quickly enhance the functionality and features of Scaleph. Currently, we have defined two plug-ins, namely the Data Source plug-in and Apache SeaTunnel plug-in, and through the Data Source plug-in, you can quickly extend the data sources such as JDBC, ES, Kafka, Clinkhouse, etc., and centralize these data sources to the Scaleph system for unified configuration and use.

Currently, Apache SeaTunnel provides so many connectors and transform plug-ins that are rather time-consuming to develop one by one, so we try to figure out a simple, declarative way to define the Apache SeaTunnel-related parameters, by which the users can quickly move the Apache SeaTunnel related capabilities to the complete Scaleph project.

Problem analysis

Flink-jdbc-connector feature enhancement

Many of the cases in the official Apache SeaTunnel documentation are implemented with FakeSource and ConsoleSink, while we are developing with jdbc-connector as the main component. In the integration process, we found that the JdbcSink of the flink-jdbc-connector plug-in only supports Stream mode, so we turned to Batch mode for it.

JdbcSource requires the user to provide SQL, and the program internally obtains the column and table information of SQL through regular expressions to generate the RowTypeInfo of JdbcSource. But when defining complex SQL, there are aliases, and subqueries that occur, resulting that regular expressions can’t cover all scenarios. We use Jdbc’s Connection to get the ResultSet of the SQL and get the column information of the SQL directly from the ResultSet to generate the RowTypeInfo of JdbcSource.

Slimming the Apache SeaTunnel-core-flink.jar

Apache SeaTunnel runs on top of Flink and Spark, both of which will be built into two separate jar packages. Apache SeaTunnel-core-flink.jar is the corresponding implementation of Flink. In version 2.1.1, Apache SeaTunnel will put the connector based on Flink implementation into this fat jar package.

When it is actually used, the data synchronization task may only use 1–2 of the connectors. There will be a certain amount of extra network overhead when the Apache SeaTunnel task is submitted.

We want to get these: a relatively thin core jar package, and the related connector jar package. When committing, the core-jar package takes a big proportion, and the related connector jar package is affiliated. Also, the resource jar package uploads method was introduced earlier, such as the missing JDBC driver package for Apache SeaTunnel’s jdbc-connector, carrying the resource jar package and the connector jar package can be submitted in the same way.

We also actively shared our experience under the relevant issue on the work of connector splitting, and when Apache SeaTunnel 2.1.2 was released, our system was easily adapted to the separate release form of Apache SeaTunnel-core-flink.jar and connector jar. At the same time, if users do not prepare the JDBC driver in the Flink cluster in advance, they can also upload the driver package by the function of resource management, and submit the Apache SeaTunnel job with the driver package.

Flink jobId acquisition problem

The core way of Flink task submission is in the form of a command-line interface, so users need to submit Flink tasks through shell scripts. After the Flink task is submitted, the command-line client will output the corresponding task id to the console log, and the users need to capture the log output to the console to extract the task id from it.

Because all interactions between our project and Flink are implemented through the Flinkful library, which can send back jobId directly as the return value of a method call. So our implementation is more elegant than capturing the console log to extract the jobId.

Apache SeaTunnel calls System.exit() issue

The Apache SeaTunnel task will first check the user-written configuration file before executing, and if the check fails, it will directly call System.exit(), and then the JVM will exit. Apache SeaTunnel submission is implemented by a shell script, so there is no problem when JVM exits.

But when we integrate Apache SeaTunnel into our application, this calling method will cause Scaleph to stop working directly, resulting in the unavailability of our service. Therefore, we also added a security restriction to the task submission code through the SecurityManager, which forbids calling the System.exit() method for the Apache SeaTunnel-related task submission process.

05 Contribution to the Apache SeaTunnel Community

I commit some PR with a friend of mine, who is also one of the Scaleph developers, such as the jdbc-connector enhancements mentioned above, as well as the implementation of the upsert function of jdbc-connector. flink-jdbc-connector’s JdbcSink has an obvious flaw in that it only supports the insert function, while not supporting update, which will limit the connector’s functionality quite a bit. So we developed support for upsert semantics to fulfill repeated synchronization of data.

Follow-up development plan

We will transfer all the Apache SeaTunnel-related connectors and transform plugins to our visual drag-and-drop page as soon as possible so that users can feel the full power of Apache SeaTunnel. Another direction is to enrich the types of data sources corresponding to the connector as the plug-ins related to Apache SeaTunnel-connector enriching.

We also hope to add DAG-related orchestration capability for data development and data integration and support SQL task development in data development.

About Apache SeaTunnel

Apache SeaTunnel (formerly Waterdrop) is an easy-to-use, ultra-high-performance distributed data integration platform that supports real-time synchronization of massive amounts of data and can synchronize hundreds of billions of data per day in a stable and efficient manner.

Why do we need Apache SeaTunnel?

Apache SeaTunnel does everything it can to solve the problems you may encounter in synchronizing massive amounts of data.

  • Data loss and duplication
  • Task buildup and latency
  • Low throughput
  • Long application-to-production cycle time
  • Lack of application status monitoring

Apache SeaTunnel Usage Scenarios

  • Massive data synchronization
  • Massive data integration
  • ETL of large volumes of data
  • Massive data aggregation
  • Multi-source data processing

Features of Apache SeaTunnel

  • Rich components
  • High scalability
  • Easy to use
  • Mature and stable

How to get started with Apache SeaTunnel quickly?

Want to experience Apache SeaTunnel quickly? SeaTunnel 2.1.0 takes 10 seconds to get you up and running.

https://seatunnel.apache.org/docs/2.1.0/developement/setup

How can I contribute?

We invite all partners who are interested in making local open-source global to join the Apache SeaTunnel contributors family and foster open-source together!

Submit an issue:

https://github.com/apache/incubator-seatunnel/issues

Contribute code to:

https://github.com/apache/incubator-seatunnel/pulls

Subscribe to the community development mailing list :

dev-subscribe@seatunnel.apache.org

Development Mailing List :

dev@seatunnel.apache.org

Join Slack:

https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1kcxzyrxz-lKcF3BAyzHEmpcc4OSaCjQ

Follow Twitter:

https://twitter.com/ASFSeaTunnel

Come and join us!

--

--

Apache SeaTunnel
Apache SeaTunnel

Written by Apache SeaTunnel

The next-generation high-performance, distributed, massive data integration tool.

No responses yet