Apache SeaTunnel(Incubating) supports IoTDB to implement IoT data synchronization
Apache IoTDB (Internet of Things Database) is a software system that integrates the collection, storage, management, and analysis of time series data of the Internet of Things, which can meet the needs of massive data storage, high-speed data reading, and complex data analysis in the field of Industrial Internet of Things. Currently, Apache SeaTunnel already supports IoTDB Connector, realizing the connection of data synchronization scenarios in the IoT field.
At the Apache SeaTunnel community online meeting in October this year, Apache SeaTunnel Committer Wang Hailin introduced the implementation process of Apache SeaTunnel’s access to IoTDB, allowing users to have a deeper understanding of the operation method and principle of IoTDB data synchronization.
The topic I’m sharing today is using Apache SeaTunnel to play around with data synchronization in IoTDB.
This session is divided into 6 subsections. Firstly, we will have an understanding of the basic concept of Apache SeaTunnel, and on this basis, we will focus on the functional features of IoTDB Connector, then we will analyze the data read and write functions of IoTDB Connector and the parsing of the implementation, and finally, we will show some typical usage scenarios and cases to let you understand how to use Finally, we will show some typical usage scenarios and cases to understand how to use the IoTDB Connector to implement into production environments. The last point is the community’s next steps for the IoTDB Connector and guidance on how to get involved in contributing.
Introduction to Apache SeaTunnel basic concepts
This is the basic architecture of Apache SeaTunnel, an engine built for data synchronization, with a set of abstract APIs for reading data from and writing to a variety of data sources.
The left-hand side briefly lists the Source scenarios, for example, we abstract the Source’s API, Type, and State, to read the data source, unifying the data types of the various data sources to the abstract type defined in it, and some state recovery and retention of the read location during the reading process.
This is an abstraction for Source, and we have done a similar abstraction for Sink, i.e. how data is written, and how the data type matches the real data source type, and how the state is restored and retained.
Based on these APIs, we will have a translation layer to translate these APIs to the corresponding execution engine. Apache SeaTunnel currently supports three execution engines, Spark, Flink, and our own execution engine, Apache SeaTunnel Engine, which will be released soon.
This is roughly what Apache SeaTunnel does, Apache SeaTunnel relies on Source and Sink to read and write data for data synchronization, we call them Connectors. The Connector consists of a Source and a Sink.
From the diagram above we see the different data sources, Source is responsible for reading data from the various data sources and transforming it into Apache SeaTunnelRow abstraction layer and Type to form the abstraction layer, Sink is responsible for pulling data from the abstraction layer and writing it to the concrete data store to transform it into the store concrete format.
The combination of Source + Abstraction Layer + Sink enables the synchronization of data between multiple heterogeneous data sources.
I’ll use a simple example below to illustrate how Apache SeaTunnel’s Source and Sink work.
We can specify the number of Sources, Sink configuration file combinations through the configuration file The commands in the toolkit provided by Apache SeaTunnel take the configuration file with them and when executed enable data handling.
This is the Connector ecosystem that is currently supported by Apache SeaTunnel, such as the data sources supported by JBDC, HDFS, Hive, Pulsar, message queues, etc. are currently supported.
The list in the picture is not exhaustive of the Connectors supported by Apache SeaTunnel. Under the GitHub Apache SeaTunnel project, you can see the Plugins directory, where supported Connector plugins are constantly being added and where you can see the latest access in real-time.
IoTDB Connector Features
Below is information about access to the IoTDB Connector.
Firstly, we would like to introduce the functional features of IoTDB, the IoTDB Connector integrated with Apache SeaTunnel, and what exactly it supports for your reference.
Source Features
Firstly, there are the typical usage scenarios supported by Source, such as bulk reading of devices, field projection, data type mapping, parallel reading, etc.
As you can see above, IoTDB supports all features except once, exactly once and stream mode, such as batch reads, IoTDB has a SQL syntax similar to group by device, which allows you to read data from multiple devices in a single batch. For basic data type projection, the SQL in IoTDB will take time by default when looking up any metric, or group by the device will take the device column, and we also support projection onto Apache SeaTunnel columns by default.
The only data type not supported is Victor, all others are supported.
For the parallel read piece, the IoTDB data is actually timestamped and we use timestamped ranges to achieve parallel reads.
The recovery of the state, since we have divided the time range read into different splits, can be done based on the Split location information.
Sink functional features
The diagram above shows the features already supported by Apache SeaTunnel. Regarding metadata extraction, we support the extraction of metadata such as measurement, device, etc. from Apache SeaTunnelRow and the extraction or use of current processing time from Apache SeaTunnelRow. Batch commits and exception retries are also supported.
IoTDB data reading analysis
Next, we analyze the implementation and support for data reading.
Data type mapping
The first is the data type mapping, which actually reads the IoTDB data type to Apache SeaTunnel, so it has to be converted to the Apache SeaTunnel data type.
The BOOLEAN, INT32, INT64, etc. listed here all have corresponding Apache SeaTunnel data types. INT32 can be mapped according to the read type on the Apache SeaTunnel, or to TINYINT, SMALLINT, or INT when the range of values is small.
The Vector type is not currently supported.
This is the corresponding example code showing how the mapping is done where the type conversion is done.
Field projection
The other is the field projection when reading, we can automatically map Time fields when reading IoTDB data, or we can choose to map some of the data to Apache SeaTunnel, such as TIMESTAMP, or BIGINT.
The SQL extraction of column codes allows you to extract only some of the columns you need, and when used on Apache SeaTunnel, you can specify the name, type, etc. of the column after it is mapped to Apache SeaTunnel via fields. The final result of the data read on Apache SeaTunnel is shown in the figure above.
We have just seen that we do not have the time column in the SQL, but the actual result is that there is this column, so we support the projection of the time column field, the time column can actually be projected into different data types, the user can convert according to their needs. The diagram above shows the implementation logic.
Batch read Device
This is a common requirement, as we are likely to synchronize data in large batches with the same data structure.
Apache SeaTunnel supports the align-by-device syntax so that device columns can also be projected onto the Apache SeaTunnelRow.
Assuming there is a table in IoTDB, we project the device column onto Apache SeaTunnel by making it data as well through syntax. After configuring the device name column and specifying the data type, we end up reading the data on Apache SeaTunnel in the format shown above, containing the Time, device column, and the actual data value. This makes it possible to read data from the same device in bulk.
Parallel reading
The other is a parallel read.
- Split
We have scoped the table by the Time column and if we are reading in parallel we may want to scope the table to allow parallel threads/processes to read a specific range of data. By configuring the three parameters, the end result will be a query SQL, where the original SQL is divided into different splits with query conditions to achieve the actual read SQL.
- Allocate Split to the reader
Once the split is done, there is an allocation logic to follow in order to distribute it to each parallel reader.
This logic is based on the ID of the split to the reader, which may be more random, or more uniform if the ID of the split is more hashed, depending on the Connector.
The result achieved is shown in the picture.
Status recovery
There is also state recovery involved when reading because if the task is large, the reading will take longer, and if there is an error or exception in the middle, you have to consider how to recover the state from the point where the error occurred, and then read it again afterward.
Apache SeaTunnel’s state recovery is mainly through the reader storing the unread Split information into the state, and then the engine will periodically take a snapshot of the state when reading so that we can restore the last snapshot when we recover and continue reading afterward.
IoTDB Connector Data Write Analysis
The next step is the parsing of the data writes.
Data type mapping
Data writing also involves data type mapping, but here, in contrast to data reading, it maps the Apache SeaTunnel data types to the IoTDB data types. As IoTDB only has INT32, the writing process involves lifting the data types TINYINT and SMALLINT. All other data types can be converted one-to-one; ARRAY and VECTOR data types are not yet supported.
The above diagram shows the corresponding code, the implementation logic will need to be seen in our specific mapping.
Dynamic injection of metadata
Apache SeaTunnel supports the dynamic injection of metadata.
When heterogeneous data sources are written to the IoTDB, device, measurement, and time are extracted from each row of data, either by serializing the Apache SeaTunnelRow with a fixed column value as configured. Alternatively, the system time can be used as the time, or the current system time can be populated if no time column is specified, and the storage group can be configured to be automatically appended to the device prefix.
For example, suppose that the structure of a row in Apache SeaTunnel reading the data format shown above can be configured to synchronize to the IoTDB and the result obtained is as follows.
The temperature and humidity columns we need were extracted, and ts and device names were extracted as the original data for the IoTDB.
Batch commits and exception retries
In addition, Sink needs to handle batch and retry when writing. For batches, we can configure the appropriate batch configuration, including support for configuring the number and interval of batch commits; if the data is cached to memory, you can enable a separate thread for timed commits.
For retries, Apache SeaTunnel supports the configuration of the number of retries, the waiting interval and the maximum number of retries, as well as the possibility to end a retry if it encounters a non-recoverable error when it has finished.
IoTDB Connector Usage Examples
After the previous analysis of reading and writing data, let’s look at three typical examples of usage scenarios.
Exporting data from IoTDB
The first scenario is exporting data from the IoTDB, the example I have given here is reading data from the IoTDB to the Console.
- Read in parallel, output to Console
Parallelism: 2
Number of batches: 24
Time frame: 2022–09–25 ~ 2022–09–26
Let’s assume that we have a data table in IoTDB and we want to export the data to the Console. The whole configuration is shown above and needs to map the columns of data we want to export and the time range to check.
This is the simplest example, but in practice, the Sink side may be more complex, so you will need to refer to the documentation of the corresponding data source for the appropriate configuration.
Importing data to IoTDB
- Read database, batch write to IoTDB
-Batch writing: one commit every 1024 entries or every 1000 ms
-Extracting metadata device, timestamp, measurement
-Specify the storage group: root.test_group
Another typical usage scenario is to import data from other data sources into IoTDB. suppose I have an external database table with columns like ts, temperature, humidity, etc. and we import it into IoTDB, requiring the columns of temperature and humidity, but the rest can be left out. The whole configuration is shown in the diagram above, you can refer to it.
On the Sink side, you mainly have to specify the Key of the device column, such as from which data the device is extracted, from which class the time is extracted, which columns to write to the IoTDB, etc.
As you can see, we can configure the storage group, which is the storage group of the IoTDB, which can be specified by the storage group.
Synchronizing data between IoTDB
The third scenario is to synchronize data between IoTDB and IoTDB and write to IoTDB in bulk, suppose there is a table in IoTDB that needs to be synchronized to another IoTDB, after synchronization the storage group has changed and the name of the indicator of the data column has also changed, then you can use projection to rewrite the indicator name and use SQL to rewrite the storage group.
How to get involved in contribution
Finally, a few words about the next steps for the IoTDB Connector and how you can get involved in improving the Connector and contributing new features that are needed.
Next steps for the IoTDB Connector
- Support for reading and writing vector data types
- Support for tsfile reads and writes
- Support for writing tsfile and reloading to IoTDB
Apache SeaTunnel Connector access issue.
https://github.com/apache/incubator-seatunnel/issues/3012
This is our planned Apache SeaTunnel Connector access issue, with a list of features and connectors to be implemented, which you can contribute to if you are interested.
Here are some introductory guidelines from the Apache SeaTunnel community on Connector access contributions.
- Find the issue you are interested in.
-https://github.com/apache/incubator-seatunnel/issues/2828
-https://github.com/apache/incubator-seatunnel/issues/1946
-https://github.com/apache/incubator-seatunnel/issues?q=is:open+is:issue+label: “help+wanted”
- Refer to the contribution guide
-https://github.com/apache/incubator-seatunnel/issues/2828
-https://github.com/apache/incubator-seatunnel/pull/2995 [Contribution Guidelines].
-https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/contribution/setup.md [Compilation Guide
- Tutorials for Beginners
-Code Demo for Apache SeaTunnel Connector Development Process
-Dwelling on Apache Apache SeaTunnel’s new API Connector development
- Join the discussion & get help
-Discuss on mailing lists, Slack
-dev-subscribe@seatunnel.apache.org
-https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1kcxzyrxz-lKcF3BAyzHEmpcc4OSaCjQ
-Take part in the PR Review and give your opinion
The Apache SeaTunnel Connector Access Incentive Program second phase is closed successfully now, but it is open all the time, welcome to participate in the campaign and improve the
Link to the program: