暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

解决SeaTunnel 2.3.4版本写入S3文件报错问题

SeaTunnel 2024-07-04
221

在使用Apache SeaTunnel时,我遇到了一个写入S3文件的报错问题。通过深入调试和分析,找到了问题所在,并提出了相应的解决方案。

本文将详细介绍报错情况、参考资料、解决思路以及后续研究方向,希望对大家有帮助!

一、详细报错

2024-04-12 20:44:18,647 ERROR [.c.FileSinkAggregatedCommitter] [hz.main.generic-operation.thread-43] - commit aggregatedCommitInfo error, aggregatedCommitInfo = FileAggregatedCommitInfo(transactionMap={/xugurtp/seatunnel/tmp/seatunnel/831147703474847745/476b6a6fc7/T_831147703474847745_476b6a6fc7_0_1={/xugurtp/seatunnel/tmp/seatunnel/831147703474847745/476b6a6fc7/T_831147703474847745_476b6a6fc7_0_1/NON_PARTITION/output_params_0.json=/xugurtp/seatunnel/tmp/6af80b38f3434aceb573cc65b9cd12216a/39111/output_params_0.json}}, partitionDirAndValuesMap={}) java.lang.IllegalStateException: Connection pool shut down

二、参考资料

  • HADOOP-16027:https://issues.apache.org/jira/browse/HADOOP-16027
  • CSDN Blog:https://blog.csdn.net/a18262285324/article/details/112470363
  • AWS SDK Java Issue #2337:https://github.com/aws/aws-sdk-java/issues/2337
  • Amazon SQS Java Messaging Lib Issue #96:https://github.com/awslabs/amazon-sqs-java-messaging-lib/issues/96
  • 博客园:https://www.cnblogs.com/xhy-shine/p/10772736.html

三、解决思路

1. 远程调试

在本地IDEA中进行debug未发现报错,但在服务器上执行时却报错,因此决定进行远程debug。执行以下命令添加JVM参数:

-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005

实际命令是:

 java -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 -Dhazelcast.client.config=/opt/module/seatunnel-2.3.4/config/hazelcast-client.yaml -Dseatunnel.config=/opt/module/seatunnel-2.3.4/config/seatunnel.yaml -Dhazelcast.config=/opt/module/seatunnel-2.3.4/config/hazelcast.yaml -Dlog4j2.configurationFile=/opt/module/seatunnel-2.3.4/config/log4j2_client.properties -Dseatunnel.logs.path=/opt/module/seatunnel-2.3.4/logs -Dseatunnel.logs.file_name=seatunnel-starter-client -Xms1024m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/seatunnel/dump/zeta-client -XX:MaxMetaspaceSize=1g -XX:+UseG1GC -cp /opt/module/seatunnel-2.3.4/lib/*:/opt/module/seatunnel-2.3.4/starter/seatunnel-starter.jar org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient -e local --config job/s3_sink.conf -cn xxx

2. 定位问题

通过调试发现问题出在hadoop-aws
使用的缓存连接池对象。关键在于if判断部分,如果上游传递了fs.s3a.impl.disable.cache=true
,则不使用缓存。深入debug发现:有时hadoopConf.getSchema
获取的不是s3a
而是s3n

s3和s3n s3a的区别

  • s3:基于块的文件系统
  • s3n:基于对象存储的文件系统,支持高达5GB的对象
  • s3a:基于对象存储的文件系统,支持高达5TB的对象,并具有更高的性能

在配置文件中设置的是s3a
,但实际获取到的是s3n
,这显然不合理。

3. 深入挖掘

我仔细看了一下报错的截图发现:

确实是commit
期间报的错:那么也就是说commit
初始化s3conf
并没有走buildWithConfig
方法,而是用的默认值,而且我根本没找到commit
里面有new s3Conf
的代码,再次debug看看谁去重新初始化了S3Conf

定位到这里就很头疼了,已经涉及到引擎层而非插件层面了,涉及到classloader
的使用以及反序列化操作:

反序列化代码:

        logicalDag =
                CustomClassLoadedObject.deserializeWithCustomClassLoader(
                        nodeEngine.getSerializationService(),
                        classLoader,
                        jobImmutableInformation.getLogicalDag());

很明显可以看出,S3Conf
(静态类)被重新初始化了,导致SHEMA被重新赋值成s3n
了。

因为s3conf
它本身的属性都是静态的,而对classloader
反序列化是时会重新加载静态属性的,所以导致shema
被重新赋值为默认s3n
了。

综上所述

除了source
sink
阶段,AggregatedCommit
操作也会写入s3File
。错误发生在commit
期间,说明初始化S3Conf
时并没有走buildWithConfig
方法,而是使用了默认值。

由于S3Conf
类的属性是静态的,反序列化时会重新加载静态属性,导致SCHEMA
被重新赋值为默认的s3n

资料参考:https://wiki.apache.org/hadoop/AmazonS3

s3:基于Block块的文件系统

S3 Block FileSystem(URI scheme:s3)由S3支持的基于块的文件系统。文件存储为块,就像HDFS一样。这样可以有效地实现重命名。此文件系统需要您为文件系统专用一个存储桶 - 您不应使用包含文件的现有存储桶,或将其他文件写入同一存储区。此文件系统存储的文件大于5GB,但不能与其他S3工具进行互操作。

s3n:基于对象存储的文件系统

S3 Native FileSystem(URI scheme:s3n)用于在S3上读取和写入常规文件的本机文件系统。这个文件系统的优点是您可以访问使用其他工具编写的S3上的文件。相反,其他工具可以访问使用Hadoop编写的文件。缺点是S3的文件大小限制为5GB。

s3a:基于对象存储的文件系统

S3A(URI方案:s3a)是S3 Native,s3n fs的继承者,S3a:系统使用Amazon的库与S3进行交互。这允许S3A支持较大的文件(不超过5GB的限制),更高的性能操作等等。文件系统旨在替代S3 Native:从s3n:// URL可访问的所有对象也应该通过替换URL模式从s3a访问。

public class S3Conf extends HadoopConf {
    private static final String HDFS_S3N_IMPL = "org.apache.hadoop.fs.s3native.NativeS3FileSystem";
    private static final String HDFS_S3A_IMPL = "org.apache.hadoop.fs.s3a.S3AFileSystem";
    private static final String S3A_SCHEMA = "s3a";
    private static final String DEFAULT_SCHEMA = "s3n";
    private static String SCHEMA = DEFAULT_SCHEMA;

    @Override
    public String getFsHdfsImpl() {
        return switchHdfsImpl();
    }

    @Override
    public String getSchema() {
        return SCHEMA;
    }

    private S3Conf(String hdfsNameKey) {
        super(hdfsNameKey);
    }

    public static HadoopConf buildWithConfig(Config config) {

        HadoopConf hadoopConf = new S3Conf(config.getString(S3ConfigOptions.S3_BUCKET.key()));
        String bucketName = config.getString(S3ConfigOptions.S3_BUCKET.key());
        if (bucketName.startsWith(S3A_SCHEMA)) {
            SCHEMA = S3A_SCHEMA;
        }
        HashMap<String, String> s3Options = new HashMap<>();
        putS3SK(s3Options, config);
        if (CheckConfigUtil.isValidParam(config, S3ConfigOptions.S3_PROPERTIES.key())) {
            config.getObject(S3ConfigOptions.S3_PROPERTIES.key())
                    .forEach((key, value) -> s3Options.put(key, String.valueOf(value.unwrapped())));
        }

        s3Options.put(
                S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER.key(),
                config.getString(S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER.key()));
        s3Options.put(
                S3ConfigOptions.FS_S3A_ENDPOINT.key(),
                config.getString(S3ConfigOptions.FS_S3A_ENDPOINT.key()));
        hadoopConf.setExtraOptions(s3Options);
        return hadoopConf;
    }

    public static HadoopConf buildWithReadOnlyConfig(ReadonlyConfig readonlyConfig) {
        Config config = readonlyConfig.toConfig();
        HadoopConf hadoopConf = new S3Conf(readonlyConfig.get(S3ConfigOptions.S3_BUCKET));
        String bucketName = readonlyConfig.get(S3ConfigOptions.S3_BUCKET);
        if (bucketName.startsWith(S3A_SCHEMA)) {
            SCHEMA = S3A_SCHEMA;
        }
        HashMap<String, String> s3Options = new HashMap<>();
        putS3SK(s3Options, config);
        if (CheckConfigUtil.isValidParam(config, S3ConfigOptions.S3_PROPERTIES.key())) {
            config.getObject(S3ConfigOptions.S3_PROPERTIES.key())
                    .forEach((key, value) -> s3Options.put(key, String.valueOf(value.unwrapped())));
        }

        s3Options.put(
                S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER.key(),
                readonlyConfig.get(S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER).getProvider());
        s3Options.put(
                S3ConfigOptions.FS_S3A_ENDPOINT.key(),
                readonlyConfig.get(S3ConfigOptions.FS_S3A_ENDPOINT));
        hadoopConf.setExtraOptions(s3Options);
        return hadoopConf;
    }

    private String switchHdfsImpl() {
        switch (SCHEMA) {
            case S3A_SCHEMA:
                return HDFS_S3A_IMPL;
            default:
                return HDFS_S3N_IMPL;
        }
    }

    private static void putS3SK(Map<String, String> s3Options, Config config) {
        if (!CheckConfigUtil.isValidParam(config, S3ConfigOptions.S3_ACCESS_KEY.key())
                && !CheckConfigUtil.isValidParam(config, S3ConfigOptions.S3_SECRET_KEY.key())) {
            return;
        }
        String accessKey = config.getString(S3ConfigOptions.S3_ACCESS_KEY.key());
        String secretKey = config.getString(S3ConfigOptions.S3_SECRET_KEY.key());
        if (S3A_SCHEMA.equals(SCHEMA)) {
            s3Options.put("fs.s3a.access.key", accessKey);
            s3Options.put("fs.s3a.secret.key", secretKey);
            return;
        }
        // default s3n
        s3Options.put("fs.s3n.awsAccessKeyId", accessKey);
        s3Options.put("fs.s3n.awsSecretAccessKey", secretKey);
    }
}

参考了反序列的知识才了解到这个情况:

当对一个包含静态成员的类进行反序列化时,静态成员不会恢复为之前的状态,而是保持在其初始状态。任何静态变量的值都是与该类本身相关的,

4. 解决方案

  • 1.去掉stastic
    修饰,把有参构造换成无参构造和静态工厂方法:

  • 2.保留stastic
    静态方法,使用getSchema
    方法代替静态属性调用:

由此可见,代码中的细节问题,即使看似微不足道,也可能引发严重的后果。一个简单的静态修饰符的误用,不仅能导致程序行为异常,更可能导致系统稳定性和安全性的大问题。

相关的issues已提交,大家有兴趣可以查看:

  • [bigfix][S3 File]:Change the [SCHEMA] attribute of the [S3CONF class] to be non-static to avoid being reassigned after deserialization by LeonYoah · Pull Request #6717 · apache/seatunnel (github.com)

  • [Bug] [S3File] [zeta-local] Error writing to S3File in version 2.3.4:: Java lang. An IllegalStateException: Connection pool shut down · Issue #6678 · apache/seatunnel (github.com)

四、有待研究

1.为什么只有local模式会报错:

推测可能是cluster模式是分布式的,每个算子分布在不同的机器上,所以本地缓存不会被使用,类似于没有走缓存。

2.为什么本地IDEA执行local模式却没问题

可能是Windows和Linux的线程调度机制不同导致的。

结论

通过这次对Apache SeaTunnel S3 File写入报错问题的分析与解决,希望这些经验能帮助到遇到类似问题的开发者,同时也提醒大家在处理分布式系统时注意细节问题,以免引发不必要的故障。


同步Demo

 MySQL→Doris
MySQLCDC
MySQL→Hive

新手入门

 SeaTunnel 让数据集成变得 So easy!  3 分钟入门指南
从 0 到 1 快速入门 Apache SeaTunnel 
初探 Apache SeaTunnel / 深入理解 Apache SeaTunnel

 MySQL 同步到 Hive / 从MySQL同步到StarRocks
通过 SeaTunnel 将数据写入 OSS-HDFS 
MySQL 到 Elasticsearch 实时同步解决方案

启动 SeaTunnel / 3 分钟部署 SeaTunnel Zeta 
 部署 Apache SeaTunnel 分布式集群
Apache SeaTunnel Web部署指南
基于Apache SeaTunnel构建CDC数据同步管道
【用户投稿】Apache SeaTunnel 2.3.3+Web 1.0.0版本安装部署
【安装部署】Apache SeaTunnel 和 Web快速安装详解
【保姆级教程】使用SeaTunnel同步Kafka的数据到ClickHouse

最佳实践

 OPPO 清风 天翼云 马蜂窝
孩子王 哔哩哔哩 唯品会
众安保险 兆原数通

测试报告


 性能测试报告:SeaTunnel 批量同步数据比 GLUE 快 420%!
最新性能对比报告:SeaTunnel 是 Airbyte 30 倍!
比DataX快20%!SeaTunnel同步计算引擎性能测试全新发布
SeaTunnel 与 DataX 、Sqoop、Flume、Flink CDC 对比


Apache SeaTunnel





Apache SeaTunnel 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台


仓库地址: 
https://github.com/apache/seatunnel

网址:
https://seatunnel.apache.org/

Apache SeaTunnel 下载地址:
https://seatunnel.apache.org/download
衷心欢迎更多人加入!

我们相信,在「Community Over Code」(社区大于代码)、「Open and Cooperation」(开放协作)、「Meritocracy」(精英管理)、以及「多样性与共识决策」等 The Apache Way 的指引下,我们将迎来更加多元化和包容的社区生态,共建开源精神带来的技术进步!

我们诚邀各位有志于让本土开源立足全球的伙伴加入 SeaTunnel 贡献者大家庭,一起共建开源!

提交问题和建议:
https://github.com/apache/seatunnel/issues

贡献代码:
https://github.com/apache/seatunnel/pulls

订阅社区开发邮件列表 : 
dev-subscribe@seatunnel.apache.org

开发邮件列表:
dev@seatunnel.apache.org

加入 Slack:
https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1kcxzyrxz-lKcF3BAyzHEmpcc4OSaCjQ

关注 Twitter: 
https://twitter.com/ASFSeaTunnel


文章转载自SeaTunnel,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论