Extending Apache SeaTunnel: Customizing SQL Transform and Implementing Join Functionality

Apache SeaTunnel
10 min readJul 12, 2023

--

Hello everyone, I’m Ma Chengyuan from Hengsheng Electronic. I work in the field of big data-related basic research and development, and I’m a contributor to Apache SeaTunnel. I was also previously a committer for the open-source component Canal from Alibaba.

Speaker Introduction

Chengyuan Ma

Community Contributor

01 SeaTunnel Transform Plugin

Today, I would like to share about the implementation of Apache SeaTunnel’s SQL transform plugin. I will explain several aspects, first introducing the principles of Apache SeaTunnel’s transform plugin, then the implementation of the SQL transform plugin, as well as the implementation of the transform plugin and UDF.

Afterwards, I will further extend two topics, one is the extension of the Join transform based on SQL, and the other is the extension of dimension table updates under CDC scenarios.

This is the Apache SeaTunnel architecture diagram from the official website. On the left is the source end of Apache SeaTunnel, on the right is the target end, and in the middle is the Apache SeaTunnel engine. Data is read from the source, passes through the middle transform layer, and finally written to the target end’s Sink. It supports external engines like Spark and Flink to submit read, write, and transform tasks, so there are corresponding Source and Sink transformation layers in Spark and Flink for data read, write, and transform.

In the old version, read and write operations were based on the Connect capability of the external engine, so the old version supports SQL conversion. However, the old version’s SQL conversion was closely coupled with Spark SQL and Flink SQL. To decouple this, from version 2.3 onwards, Transform, Sink, and Source’s Connect was integrated into its own engine and tasks were submitted to Spark or Flink engine for computation and data read/write through the transformation layer.

As such, SQL capabilities were removed from version 2.3 onwards.

02 Plugin Implementation

To implement your own Transform plugin extension, you need to implement a few interfaces. One of them is the SetConfig interface, another is the TransformType interface, and another is the TransformRow interface.

On the left is an example of the configuration for an extension implementation. The input table is the output table upstream, and the output table can serve as the input table downstream. This is an example of a replace plugin conversion. It also needs to implement these interfaces, among which Transform type is the main one. Its input is the field type of each input row, including all field types of input rows, and the output is all field types of output rows after conversion.

TransformRow is the core interface for data conversion. It receives the input row data from upstream, converts one row of input data according to the defined conversion rules, and outputs the conversion result along with the target type to downstream. Next, let me introduce the implementation of the SQL transform plugin.

The configuration of the SQL transform plugin is relatively simple, as shown in the figure.

The upstream input is from the upstream input table name, and the output is the output table name. Below is a line of SQL for Select operation. I added some row numbers and calculation logic in the example. For example, Select id+1, then as id; append an underscore to name as name;

Also, Dateadd(sys_time, 1) is timestamp plus date plus 1. The table name should be consistent with the Source table name, that is, it should be consistent with fake, otherwise an error will be reported. It also supports filtering conditions. For example, only data with id>10 is converted.

This is an implementation logic diagram. The upper layer is the Connector, the Source connection reads upstream data, upstream data can be offline data or real-time data sources, downstream is the Sink layer, mainly adding SQL conversion logic in the middle and supports UDF.

Then the Transform layer is extended to include SQL capabilities. This is the overall SQL implementation logic. In version 2.3.1, SQL only supports simple SQL, just like the SQL I just demonstrated, similar to Select some fields, then add corresponding function processing, and supports filtering conditions.

Therefore, we need to parse SQL to generate an abstract syntax tree, and then generate a physical execution plan. At the same time, it has built-in a large number of function libraries, including commonly used functions, such as numerical, character, and time functions, etc. It also supports UDF, calling the UDF developed by yourself in the function library through the SPI plugin.

After parsing the SQL’s abstract syntax tree and physical execution plan, there will be two steps, one is type conversion and the other is data conversion, which are the two interface methods mentioned earlier. Type conversion is to convert the input type to the output type. Not just simple field types, type conversion may also involve functions and constants.

For example, if you directly Select 1, then this 1 is actually a constant, so some constants and functions need to be type converted. And functions usually have a specified return type, such as numeric functions and string functions have a fixed return type. But in some cases, the return type may be uncertain, and the output type may need to be determined based on the type of the input field. Therefore, it is necessary to recursively process functions to get the final return type.

The next layer is the data converter, which uses the physical execution plan, function library, and filtering conditions to convert data. Its input is the value of each field, that is, the value of each field in each row. After calculation through the physical execution plan, we get the value of each field in the output row.

At present, the SQL transform plugin only supports basic map-end Transform capabilities, that is, small T capabilities, just like the SQL in the example. It currently only supports syntax for Select and condition filtering of single-row data. The SQL execution engine has a rich function library built-in, and the following is an example of the SQL execution engine’s built-in function library, just like the function example I wrote earlier:


SELECT id as id, upper(name) as name, dateadd(systime, 1) as systime FROM tuser

For example, ID concatenates an underscore, then as ID, upper converts name to uppercase, then as name, and date operations can also be added or subtracted. This function library provides a large number of functions, I roughly counted about forty to fifty kinds of functions, and supports nesting, and standard SQL writing is supported.

Next, I will introduce the implementation of UDF. If we extend based on the native Transform plugin, it will be relatively cumbersome.

First, the TableTransformFactory interface needs to be implemented. This interface is for implementing factory classes, and the CheckTableTransform abstract class also needs to be implemented. In addition, attention should be paid to the configuration logic of Apache SeaTunnel transform, and the corresponding processing logic needs to be manually followed and developed. Therefore, in the SQL transform plugin, we support UDF (User-Defined Functions) and provide a brand new interface for custom functions. In the UDF of the SQL transform plugin, you just need to implement the following SPI interface.

We provide a UDF interface called ZetaUDF, which mainly has three methods. The first is getFunctionName(), which returns the name of the function. The next is getResultType(), which specifies the return type of the function. As mentioned earlier, functions have return types, most functions specify the return type, but in some cases the return type is determined by the input type. Finally, there is evaluator(), which is used for function calculation. Its input parameters are the input parameter values of the corresponding function, and after calculation, a definite result is returned.

The above figure is an implementation example of UDF for the SQL transform plugin, here is a custom DES encryption function, you just need to implement these three methods.

As you can see, custom functions are very straightforward. The first method defines the return name of the function, here the function name is desEncrypt. The second method is to specify the return result type, because it is for string encryption, so the return is a string type. The last is the computation logic of the function, here is to encrypt the input value. The function has two inputs, one is the password, and the other is the corresponding data.

Encryption is carried out based on this data and password. Here, the DES encryption library is used for encryption operations. By implementing these three methods, UDF development can be completed.

It is clear that, compared to the extension of Transform we mentioned before, UDF development is more convenient because we only need to pay attention to the input parameter values and types of the corresponding functions, rather than the entire row of data.

03 Expansion Plan

Next are two topics, one is about the planning of SQL transform extension, and the other is about the planning of Join.

Currently we only support simple SQL capabilities. The next plan is to add Join capabilities, i.e., the extension in the picture.

When we need to splice Join information, when getting data here, each row of data has a master table, and the tenant table will read the Join information from the cache. If it does not exist in the cache, it may query the corresponding dimension table information from the source in the original table, or cache the dimension table information after querying it to the LRU cache, and then splice the physical execution plan for Join, and finally splice the data into a row of frame table data and write it to the corresponding target database.

In the Join function of the SQL transform plugin, it is similar to the dimension table association capability of SQL in Flink. By defining the Connector of Join source, and loading the configuration of Source, get data from the source side according to Join key, load the data into the cache in full, and have cache strategies such as LRU cache and no cache.

If the main table data is sharded, it is recommended to use Join key for sharding to ensure that the main table data and dimension table data fall in the same shard. In addition, for the expansion of the SQL engine, it is also necessary to parse and run the physical execution plan that supports Join.

The following is a configuration example of join in. The source table is above, and then an array of join_table_name will be added below, representing an array of dimension tables. Take the user table as an example, it has done a left join with the role table. The join key is the id of the role table, equal to the roleId of the main table.

Then return the name of the role table. In this way, a wide table is formed, containing the user’s ID, the user’s name, the role’s ID and the role’s name, and then the data is output. Since we have planned the implementation of Join function, we inevitably need to consider the situation of dimension table update.

04 CDC — Dimension Table Update Plan

In the CDC (Change Data Capture) scenario, updates to the dimension table are quite common.

Here is a simple plan for the logic of dimension table update. Suppose we have joined the user table and the low table, for example, there are three fields, this side is a category field. After the join, there are four fields, the first three fields are from the user table, and the low name is from the low table.

How to handle historical data after updating the dimension table? Especially in the CDC scenario, because if we update the new data, the association is definitely associated with the new data. For example, if I update the name of ID equals to 1, a CDC event will be generated, and then we will update it in batches through this event.

The update operation is to perform batch calculations on role_name. The design idea is to update the dimension table fields in the target table in batches through the Join key. But this scheme may have certain limitations. The main ones include the following three points. First of all, the dimension table must support the CDC library, such as MySQL, Oracle, or PG, etc. Otherwise, it cannot listen to the update event of the dimension table. Secondly, the target table must contain the join key field, because I need to update through the row_id of the target table, because there is no ID information of the main table, only the ID information of the dimension table, and the ID information in the dimension table corresponds to the role_id field in the target table, so the target table must contain the components or key information of the dimension table.

Another condition is that the target table must support batch update or batch query through Join key, not just batch update or batch query through components. For example, the target table can support relational databases, or ES, to query or update the corresponding Role_name data through other fields.

That’s all for my sharing, and I welcome those who are interested to build Apache SeaTunnel together!

--

--

Apache SeaTunnel
Apache SeaTunnel

Written by Apache SeaTunnel

The next-generation high-performance, distributed, massive data integration tool.

No responses yet