Sharing practical experience of using SeaTunnel to synchronize HTTP to Doris

Apache SeaTunnel
4 min readJul 22, 2024

--

Due to the need to integrate data from different data sources into our data warehouse for our projects, we chose Apache SeaTunnel among many options (Comparison Reference).

Currently, the interface we are using does not require authentication. If authentication is needed in the future, we will discuss and test that as well.

Actual Usage

Apache SeaTunnel Version: 2.3.4

Without further ado, here is the final configuration file. Since I am using the json rest-api submission method, the result is shown below:

The difference between using rest and conf lies in the job execution environment. The conf uses ClientJobExecutionEnvironment (also supports JSON format upon testing), while the rest uses RestJobExecutionEnvironment.

Data Format Returned by the Interface

{
"code": "0000",
"msg": "Success",
"data": {
"records": [
{
"id": "1798895733824393218",
"taskContent": "License02",
"taskType": "License"
}
]
}
}
// The actual data is paginated; the above is a sample.

Integration Configuration

{
"env": {
"job.mode": "BATCH",
"job.name": "SeaTunnel_Job"
},
"source": [
{
"result_table_name": "Table13367210156032",
"plugin_name": "Http",
"url": "http://*.*.*.*:*/day_plan_repair/page",
"method": "GET",
"format": "json",
"json_field": {
"id": "$.data.records[*].id",
"taskContent": "$.data.records[*].taskContent",
"taskType": "$.data.records[*].taskType"
},
// "pageing": {
// "page_field": "current",
// "batch_size": 10
// },
"schema": {
"fields": {
"id": "BIGINT",
"taskContent": "STRING",
"taskType": "STRING"
}
}
}
],
"transform": [
{
"field_mapper": {
"id": "id",
"taskContent": "task_content",
"taskType": "task_type"
},
"result_table_name": "Table13367210156033",
"source_table_name": "Table13367210156032",
"plugin_name": "FieldMapper"
}
],
"sink": [
{
"source_table_name": "Table13367210156033",
"plugin_name": "Doris",
"fenodes ": "*.*.*.*:*",
"database": "test",
"password": "****",
"username": "****",
"table": "ods_day_plan",
"sink.label-prefix": "test-ods_day_plan",
"sink.enable-2pc": false,
"data_save_mode": "APPEND_DATA",
"schema_save_mode": "CREATE_SCHEMA_WHEN_NOT_EXIST",
"save_mode_create_template": "CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` (\n ${rowtype_fields}\n ) ENGINE=OLAP\n UNIQUE KEY (id)\n DISTRIBUTED BY HASH (id)\n PROPERTIES (\n \"replication_allocation\" = \"tag.location.default: 1\",\n \"in_memory\" = \"false\",\n \"storage_format\" = \"V2\",\n \"disable_auto_compaction\" = \"false\"\n )",
"sink.enable-delete": true,
"doris.config": {
"format": "json",
"read_json_by_line": "true"
}
}
]
}

Issues Encountered During Usage

Handle Save Mode Failed

Caused by: java.sql.SQLException: errCode = 2, detailMessage = Syntax error in line 21:
UNIQUE KEY ()
^
Encountered: )
Expected: IDENTIFIER

Solution: See the link [issue](https://github.com/apache/seatunnel/issues/6646)

This issue was resolved by using the `save_mode_create_template` field in the configuration file, which can be customized according to business needs.

NoSuchMethodError

java.lang.NoSuchMethodError: retrofit2.Retrofit$Builder.client(Lshaded/okhttp3/OkHttpClient;)Lretrofit2/Retrofit$Builder;
at org.influxdb.impl.InfluxDBImpl.<init>(InfluxDBImpl.java:179) ~[connector-influxdb-2.3.4.jar:2.3.4]
at org.influxdb.impl.InfluxDBImpl.<init>(InfluxDBImpl.java:120) ~[connector-influxdb-2.3.4.jar:2.3.4]
at org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient.getInfluxDB(InfluxDBClient.java:72) ~[connector-influxdb-2.3.4.jar:2.3.4]

When using the InfluxDB connection, I encountered a jar package conflict issue. It was found that there was a version conflict between the `retrofit2` dependency used to create the HTTP connection and the one in the `datahub` connector. Since I did not use `datahub`, removing the `datahub` connector solved the issue.

Apache Doris BIGINT Type Precision Loss Issue

See the post for details.

Configuring Primary Key

When configuring the save_mode_create_template for Doris, the primary key type must be a number or date type.

The id field in the source schema configuration is returned as a string type, but it is an all-numeric type generated by the Snowflake algorithm, so the BIGINT type is used for automatic conversion.

The reason is that the UNIQUE KEY in the save_mode_create_template in the sink configuration uses id as the primary key, and Doris requires that the primary key column type must be a number or date type!!

Personal Experience

  1. When there is only one sink, source, or transform, you can omit the result_table_name and source_table_name configuration items.
  2. Download the source code, modify it, and add log entries to the source code. Package and replace the jar in SeaTunnel runtime to facilitate understanding the code and obtaining the desired results through logs.
  3. Based on the first point, after becoming familiar with the code, secondary development can be carried out. For example, how to handle interfaces requiring token authentication.
  4. Note that the value of the JsonPath in the json_field of the source configuration does not support extracting values from complex types in lists (Array<Object> or Map<String, Object>). Consider secondary development to resolve this.
// Example:
{
"code": "0000",
"msg": "Success",
"data": {
"records": [
{
"id": "1798895733824393218",
"taskContent": "License02",
"taskType": "License",
"region_list": [ // This format's region_list cannot be parsed and synced $.data.records[*].region_list[*].id will cause a data and total mismatch error
{
"id":"1",
"name": "11"
},
{
"id":"1",
"name": "11"
}
]
}
]
}
}

Testing Code (Using JDK17)

 private static final Option[] DEFAULT_OPTIONS = {
Option.SUPPRESS_EXCEPTIONS, Option.ALWAYS_RETURN_LIST, Option.DEFAULT_PATH_LEAF_TO_NULL
};
private JsonPath[] jsonPaths;
private final Configuration jsonConfiguration = Configuration.defaultConfiguration().addOptions(DEFAULT_OPTIONS);

@Test
public void test5() {
String data = """
{
"code": "0000",
"msg": "Success",
"data": {
"records": [
{
"id": "1798895733824393218",
"taskContent": "12312312313"
}
]
}
}
""";
Map<String, String> map = new HashMap<>();
map.put("id", "$.data.records[*].id");
map.put("taskContent", "$.data.records[*].taskContent");
JsonField jsonField = JsonField.builder().fields(map).build();
initJsonPath(jsonField);
data = JsonUtils.toJsonNode(parseToMap(decodeJSON(data), jsonField)).toString();
log.error(data);
}
// The following code is from HttpSourceReader
private void initJsonPath(JsonField jsonField) {
jsonPaths = new JsonPath[jsonField.getFields().size()];
for (int index = 0; index < jsonField.getFields().keySet().size(); index++) {
jsonPaths[index] =
JsonPath.compile(
jsonField.getFields().values().toArray(new String[] {})[index]);
}
}

private List<Map<String, String>> parseToMap(List<List<String>> datas, JsonField jsonField) {
List<Map<String, String>> decodeDatas = new ArrayList<>(datas.size());
String[] keys = jsonField.getFields().keySet().toArray(new String[] {});

for (List<String> data : datas) {
Map<String, String> decodeData = new HashMap<>(jsonField.getFields().size());
final int[] index = {0};
data.forEach(
field -> {
decodeData.put(keys[index[0]], field);
index[0]++;
});
decodeDatas.add(decodeData);
}

return decodeDatas;
}

private List<List<String>> decodeJSON(String data) {
ReadContext jsonReadContext = JsonPath.using(jsonConfiguration).parse(data);
List<List<String>> results = new ArrayList<>(jsonPaths.length);
for (JsonPath path : jsonPaths) {
List<String> result = jsonReadContext.read(path);
results.add(result);
}
for (int i = 1; i < results.size(); i++) {
List<?> result0 = results.get(0);
List<?> result = results.get(i);
if (result0.size() != result.size()) {
throw new HttpConnectorException(
HttpConnectorErrorCode.FIELD_DATA_IS_INCONSISTENT,
String.format(
"[%s](%d) and [%s](%d) the number of parsing records is inconsistent.",
jsonPaths[0].getPath(),
result0.size(),
jsonPaths[i].getPath(),
result.size()));
}
}

return dataFlip(results);
}

private List<List<String>> dataFlip(List<List<String>> results) {

List<List<String>> datas = new ArrayList<>();
for (int i = 0; i < results.size(); i++) {
List<String> result = results.get(i);
if (i == 0) {
for (Object o : result) {
String val = o == null ? null : o.toString();
List<String> row = new ArrayList<>(jsonPaths.length);
row.add(val);
datas.add(row);
}
} else {
for (int j = 0; j < result.size(); j++) {
Object o = result.get(j);
String val = o == null ? null : o.toString();
List<String> row = datas.get(j);
row.add(val);
}
}
}
return datas;
}

I hope this experience sharing will be helpful to everyone!

--

--

Apache SeaTunnel
Apache SeaTunnel

Written by Apache SeaTunnel

The next-generation high-performance, distributed, massive data integration tool.

No responses yet