Troubleshooting Apache SeaTunnel S3 File Write Errors

Apache SeaTunnel
5 min readJul 8, 2024

--

During my use of Apache SeaTunnel, I encountered an issue with writing files to S3. Through in-depth debugging and analysis, I identified the problem and proposed corresponding solutions.

This article will detail the error, reference materials, troubleshooting steps, and future research directions. I hope it will be helpful to you!

Error Details

2024–04–12 20:44:18,647 ERROR [.c.FileSinkAggregatedCommitter] [hz.main.generic-operation.thread-43] — commit aggregatedCommitInfo error, aggregatedCommitInfo = FileAggregatedCommitInfo(transactionMap={/xugurtp/seatunnel/tmp/seatunnel/831147703474847745/476b6a6fc7/T_831147703474847745_476b6a6fc7_0_1={/xugurtp/seatunnel/tmp/seatunnel/831147703474847745/476b6a6fc7/T_831147703474847745_476b6a6fc7_0_1/NON_PARTITION/output_params_0.json=/xugurtp/seatunnel/tmp/6af80b38f3434aceb573cc65b9cd12216a/39111/output_params_0.json}}, partitionDirAndValuesMap={}) java.lang.IllegalStateException: Connection pool shut down

Reference Materials

Troubleshooting Steps

1. Remote Debugging

Local debugging with IDEA did not show the error, but it occurred when running on the server. Therefore, I decided to perform remote debugging. Add the following JVM parameters:

-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005

The actual command is:

 java -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 -Dhazelcast.client.config=/opt/module/seatunnel-2.3.4/config/hazelcast-client.yaml -Dseatunnel.config=/opt/module/seatunnel-2.3.4/config/seatunnel.yaml -Dhazelcast.config=/opt/module/seatunnel-2.3.4/config/hazelcast.yaml -Dlog4j2.configurationFile=/opt/module/seatunnel-2.3.4/config/log4j2_client.properties -Dseatunnel.logs.path=/opt/module/seatunnel-2.3.4/logs -Dseatunnel.logs.file_name=seatunnel-starter-client -Xms1024m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/seatunnel/dump/zeta-client -XX:MaxMetaspaceSize=1g -XX:+UseG1GC -cp /opt/module/seatunnel-2.3.4/lib/*:/opt/module/seatunnel-2.3.4/starter/seatunnel-starter.jar org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient -e local --config job/s3_sink.conf -cn xxx

2. Identifying the Issue

Through debugging, I found that the problem lay in the cached connection pool object used by hadoop-aws. The key part was an if-statement: if fs.s3a.impl.disable.cache=true was passed from upstream, caching was disabled. Further debugging revealed that sometimes hadoopConf.getSchema was not s3a but s3n.

Difference between s3, s3n, and s3a:

  • s3: Block-based file system
  • s3n: Object storage-based file system supporting up to 5GB objects
  • s3a: Object storage-based file system supporting up to 5TB objects with better performance

Although s3a was set in the configuration file, s3n was being retrieved, which was clearly incorrect.

3. In-depth Analysis

After reviewing the error screenshot,

it was confirmed that the error occurred during the commit phase. This indicated that the s3conf was initialized without the buildWithConfig method and used default values. Further debugging revealed that S3Conf

It is very troublesome to locate here. It has already involved the engine layer rather than the plug-in level, involving the use of classloader and deserialization operations:

Deserialization code:

logicalDag =
CustomClassLoadedObject.deserializeWithCustomClassLoader(
nodeEngine.getSerializationService(),
classLoader,
jobImmutableInformation.getLogicalDag());

It can be clearly seen that S3Conf (static class) has been reinitialized, resulting in SHEMA being reassigned to s3n.

Because the properties of s3conf itself are static, and the static properties will be reloaded when deserializing the classloader, shema is reassigned to the default s3n.

In summary

In addition to the source and sink stages, the AggregatedCommit operation will also write to s3File. The error occurred during commit, indicating that the buildWithConfig method was not used when initializing S3Conf, but the default value was used.

Because the attributes of the S3Conf class are static, the static attributes will be reloaded during deserialization, causing SCHEMA to be reassigned to the default s3n.

Reference: https://wiki.apache.org/hadoop/AmazonS3

S3: Block-based file system

S3 Block FileSystem (URI scheme: s3) A block-based file system backed by S3. Files are stored as blocks, just like HDFS. This allows for efficient renaming. This file system requires you to dedicate a bucket to the file system — you should not use an existing bucket that contains files, or write other files to the same bucket. This file system stores files larger than 5GB, but is not interoperable with other S3 tools.

S3n: Object storage-based file system

S3 Native FileSystem (URI scheme: s3n) A native file system for reading and writing regular files on S3. The advantage of this file system is that you can access files on S3 written with other tools. Conversely, other tools can access files written with Hadoop. The disadvantage is that S3 has a file size limit of 5GB.

S3a: Object storage-based file system

S3A (URI scheme: s3a) is the successor to S3 Native, s3n fs, the S3a: system uses Amazon’s libraries to interact with S3. This allows S3A to support larger files (up to the 5GB limit), higher performance operations, and more. The file system is intended to be a drop-in replacement for S3 Native: all objects accessible from s3n:// URLs should also be accessible from s3a by replacing the URL pattern.

public class S3HadoopConf extends HadoopConf {
private static final String HDFS_S3N_IMPL = "org.apache.hadoop.fs.s3native.NativeS3FileSystem";
private static final String HDFS_S3A_IMPL = "org.apache.hadoop.fs.s3a.S3AFileSystem";
protected static final String S3A_SCHEMA = "s3a";
protected static final String DEFAULT_SCHEMA = "s3n";
private String schema = DEFAULT_SCHEMA;

@Override
public String getFsHdfsImpl() {
return switchHdfsImpl();
}

@Override
public String getSchema() {
return this.schema;
}

public void setSchema(String schema) {
this.schema = schema;
}

public S3HadoopConf(String hdfsNameKey) {
super(hdfsNameKey);
}

public static HadoopConf buildWithReadOnlyConfig(ReadonlyConfig config) {

String bucketName = config.get(S3ConfigOptions.S3_BUCKET);
S3HadoopConf hadoopConf = new S3HadoopConf(bucketName);
if (bucketName.startsWith(S3A_SCHEMA)) {
hadoopConf.setSchema(S3A_SCHEMA);
}
HashMap<String, String> s3Options = new HashMap<>();
hadoopConf.putS3SK(s3Options, config);
if (config.getOptional(S3ConfigOptions.S3_PROPERTIES).isPresent()) {
config.get(S3ConfigOptions.S3_PROPERTIES)
.forEach((key, value) -> s3Options.put(key, String.valueOf(value)));
}

s3Options.put(
S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER.key(),
config.get(S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER).getProvider());
s3Options.put(
S3ConfigOptions.FS_S3A_ENDPOINT.key(), config.get(S3ConfigOptions.FS_S3A_ENDPOINT));
hadoopConf.setExtraOptions(s3Options);
return hadoopConf;
}

protected String switchHdfsImpl() {
switch (this.schema) {
case S3A_SCHEMA:
return HDFS_S3A_IMPL;
default:
return HDFS_S3N_IMPL;
}
}

private void putS3SK(Map<String, String> s3Options, ReadonlyConfig config) {
if (!config.getOptional(S3ConfigOptions.S3_ACCESS_KEY).isPresent()
&& config.getOptional(S3ConfigOptions.S3_SECRET_KEY).isPresent()) {
return;
}
String accessKey = config.get(S3ConfigOptions.S3_ACCESS_KEY);
String secretKey = config.get(S3ConfigOptions.S3_SECRET_KEY);
if (S3A_SCHEMA.equals(this.schema)) {
s3Options.put("fs.s3a.access.key", accessKey);
s3Options.put("fs.s3a.secret.key", secretKey);
return;
}
// default s3n
s3Options.put("fs.s3n.awsAccessKeyId", accessKey);
s3Options.put("fs.s3n.awsSecretAccessKey", secretKey);
}
}

I learned this by referring to the knowledge of deserialization:

When a class containing static members is deserialized, the static members will not be restored to their previous state, but will remain in their initial state. The value of any static variable is related to the class itself.

4. Solution

  • Remove the static modifier and replace the parameterized constructor with a no-argument constructor and a static factory method.
  • Retain the static methods but use the getSchema method instead of directly calling the static property.

The related issues have been submitted for further review:

Future Research

1. Why does the error occur only in local mode?

It is speculated that in cluster mode, each operator is distributed across different machines, so the local cache is not used.

2. Why does the local mode execution in IDEA not show the issue?

This could be due to differences in thread scheduling mechanisms between Windows and Linux.

Conclusion

Through this analysis and resolution of the Apache SeaTunnel S3 File write error, I hope these experiences can help developers facing similar issues. It also serves as a reminder to pay attention to detail in distributed systems to avoid unnecessary failures.

--

--

Apache SeaTunnel
Apache SeaTunnel

Written by Apache SeaTunnel

The next-generation high-performance, distributed, massive data integration tool.

No responses yet