暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

必看!S3File Sink Connector 使用文档

SeaTunnel 2023-09-19
185

@

点击蓝字

关注我们

S3File 是一个用于管理 Amazon S3(Simple Storage Service)的 Python 模块。当前,Apache SeaTunnel 已经支持 S3File Sink Connector,为了更好地使用这个 Connector,有必要看一下这篇使用文档指南。


01




描述


将数据输出到 AWS S3 文件系统。
提示:
如果您使用的是 Spark/Flink,在使用此连接器之前,必须确保您的 Spark/Flink 集群已经集成了 Hadoop。Hadoop 2.x 版本已通过测试。
如果您使用的是 SeaTunnel Engine,它会在您下载和安装 SeaTunnel Engine 时自动集成 Hadoop JAR 包。您可以在 ${SEATUNNEL_HOME}/lib
 目录下确认这个 JAR 包是否存在。

02




主要特性

✅仅一次语义
默认情况下,我们使用 2PC 提交来确保 "仅一次语义"。
✅文件格式类型
    ✅文本 (text)
    CSV
    Parquet
    ORC
    JSON
    Excel

03




选项

选项
类型必需默认值 备注
path
string
-

bucket
string
-

fs.s3a.endpoint
string
-

fs.s3a.aws.credentials.provider
string
com.amazonaws.auth.InstanceProfileCredentialsProvider

access_key
string
-
仅在 fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider 时使用
access_secret
string
-
仅在 fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider 时使用
custom_filename
boolean
false
是否需要自定义文件名
file_name_expression
string
"${transactionId}"
仅在 custom_filename 为 true 时使用
filename_time_format
string
"yyyy.MM.dd"
仅在 custom_filename 为 true 时使用
file_format_type
string
"csv"

field_delimiter
string
'\001'
仅在 file_format 为 text 时使用
row_delimiter
string
"\n"
仅在 file_format 为 text 时使用
have_partition
boolean
false
是否需要处理分区
partition_by
array
-
仅在 have_partition 为 true 时使用
partition_dir_expression
string
"k0={v0}/k1={v1}/.../kn={vn}/"
仅在 have_partition 为 true 时使用
is_partition_field_write_in_file
boolean
false
仅在 have_partition 为 true 时使用
sink_columns
array

当此参数为空时,将写入所有从 "Transform" 或 "Source" 获取的字段
is_enable_transaction
boolean
true

batch_size
int
1000000

compress_codec
string
none

common-options
object
-

max_rows_in_memory
int
-
仅在 file_format 为 Excel 时使用
sheet_name
string
Sheet${Random number}
仅在 file_format 为 Excel 时使用

path [string]

目标目录路径是必需的。

bucket [string]

S3 文件系统的bucket地址,例如:s3n://seatunnel-test
,如果您使用的是 s3a
 协议,此参数应为 s3a://seatunnel-test

fs.s3a.endpoint [string]

fs s3a 端点

fs.s3a.aws.credentials.provider [string]

认证 s3a 的方式。目前我们仅支持 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
 和 com.amazonaws.auth.InstanceProfileCredentialsProvider
关于凭证提供程序的更多信息,您可以参考 Hadoop AWS 文档

access_key [string]

S3 文件系统的访问密钥。如果未设置此参数,请确认凭证提供程序链可以正确验证,可参考 hadoop-aws

access_secret [string]

S3 文件系统的访问密钥。如果未设置此参数,请确认凭证提供程序链可以正确验证,可参考 hadoop-aws

hadoop_s3_properties [map]

如果需要添加其他选项,可以在这里添加并参考此 链接
hadoop_s3_properties {
"fs.s3a.buffer.dir" = "/data/st_test/s3a"
"fs.s3a.fast.upload.buffer" = "disk"
}

custom_filename [boolean]

是否自定义文件名。

file_name_expression [string]

仅在 custom_filename
 为 true
 时使用
file_name_expression
 描述了将创建到 path
 中的文件表达式。我们可以在 file_name_expression
 中添加变量 ${now} 
或 ${uuid}
,例如 test_${uuid}_${now}

${now}
 代表当前时间,其格式可以通过指定选项 filename_time_format
 来定义。
请注意,如果 is_enable_transaction
 为 true
,我们将在文件名的开头自动添加${transactionId}_

filename_time_format [string]

仅在 custom_filename
 为 true
 时使用
当 file_name_expression
 参数中的格式为 xxxx-${now}
 时,filename_time_format
 可以指定路径的时间格式,默认值为 yyyy.MM.dd
。常用的时间格式列于下表中:
符号
描述
y
M
d
月中的天数
H
一天中的小时 (0-23)
m
小时中的分钟
s
分钟中的秒数

file_format_type [string]

我们支持以下文件类型:
  • 文本 (text)
  • JSON
  • CSV
  • ORC
  • Parquet
  • Excel
请注意,最终文件名将以文件格式的后缀结尾,文本文件的后缀是 txt

field_delimiter [string]

数据行中列之间的分隔符。仅在 file_format
 为 text 时需要。

row_delimiter [string]

文件中行之间的分隔符。仅在 file_format
 为 text 时需要。

have_partition [boolean]

是否需要处理分区。

partition_by [array]

仅在 have_partition
 为 true
 时使用。
基于选定字段对分区数据进行分区。

partition_dir_expression [string]

仅在 have_partition
 为 true
 时使用。
如果指定了 partition_by
,我们将根据分区信息生成相应的分区目录,并将最终文件放在分区目录中。
默认的 partition_dir_expression
 是 ${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/
k0
 是第一个分区字段,v0
 是第一个分区字段的值。

is_partition_field_write_in_file [boolean]

仅在 have_partition
 为 true
 时使用。
如果 is_partition_field_write_in_file
 为 true
,分区字段及其值将写入数据文件中。
例如,如果您想要写入 Hive 数据文件,其值应为 false

sink_columns [array]

需要写入文件的哪些列,默认值为从 "Transform" 或 "Source" 获取的所有列。
字段的顺序决定了实际写入文件的顺序。

is_enable_transaction [boolean]

如果 is_enable_transaction
 为 true,我们将确保在写入目标目录时数据不会丢失或重复。
请注意,如果 is_enable_transaction
 为 true
,我们将在文件头部自动添加 ${transactionId}_
目前仅支持 true

batch_size [int]

文件中的最大行数。对于 SeaTunnel Engine,文件中的行数由 batch_size
 和 checkpoint.interval
 共同决定。如果 checkpoint.interval
 的值足够大,当文件中的行数大于 batch_size
 时,写入器将写入文件。如果 checkpoint.interval
较小,则在新的检查点触发时,写入器将创建一个新文件。

compress_codec [string]

文件的压缩编解码器及其支持的详细信息如下:
  • txt: lzo
     none
  • JSON: lzo
     none
  • CSV: lzo
     none
  • ORC: lzo
     snappy
     lz4
     zlib
     none
  • Parquet: lzo
     snappy
     lz4
     gzip
     brotli
     zstd
     none
提示:Excel 类型不支持任何压缩格式。

常见选项

请参考 Sink Common Options 获取 Sink 插件的常见参数详细信息。

max_rows_in_memory [int]

当文件格式为 Excel 时,可以缓存在内存中的数据项的最大数量。

sheet_name [string]

工作簿的工作表名称。

044




示例

对于文本文件格式,具有 have_partition
custom_filename
sink_columns
 和 com.amazonaws.auth.InstanceProfileCredentialsProvider
 的配置示例:
S3File {
bucket = "s3a://seatunnel-test"
tmp_path = "/tmp/seatunnel"
path="/seatunnel/text"
fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
file_format_type = "text"
field_delimiter = "\t"
row_delimiter = "\n"
have_partition = true
partition_by = ["age"]
partition_dir_expression = "${k0}=${v0}"
is_partition_field_write_in_file = true
custom_filename = true
file_name_expression = "${transactionId}_${now}"
filename_time_format = "yyyy.MM.dd"
sink_columns = ["name","age"]
is_enable_transaction=true
hadoop_s3_properties {
"fs.s3a.buffer.dir" = "/data/st_test/s3a"
"fs.s3a.fast.upload.buffer" = "disk"
}
}

对于 Parquet 文件格式,仅需用 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
进行配置:
S3File {
bucket = "s3a://seatunnel-test"
tmp_path = "/tmp/seatunnel"
path="/seatunnel/parquet"
fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
access_key = "xxxxxxxxxxxxxxxxx"
secret_key = "xxxxxxxxxxxxxxxxx"
file_format_type = "parquet"
hadoop_s3_properties {
"fs.s3a.buffer.dir" = "/data/st_test/s3a"
"fs.s3a.fast.upload.buffer" = "disk"
}
}

对于 orc 文件仅需配置 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
S3File {
bucket = "s3a://seatunnel-test"
tmp_path = "/tmp/seatunnel"
path="/seatunnel/orc"
fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
access_key = "xxxxxxxxxxxxxxxxx"
secret_key = "xxxxxxxxxxxxxxxxx"
file_format_type = "orc"
}


05




更新日志


2.3.0-beta 2022-10-20

  • 添加 S3File Sink 连接器

2.3.0 2022-12-30

  • Bug修复
    • 当上游字段为空时会抛出 NullPointerException
    • Sink 列映射失败
    • 从状态中恢复写入器时直接获取事务失败 (3258)
    • 修复了以下导致数据写入文件失败的错误:
  • 功能
    • 允许用户添加额外的 Hadoop-S3 参数
    • 允许使用 S3A 协议
    • 解耦 Hadoop-AWS 依赖
    • 支持 S3A 协议 (3632)
    • 支持设置每个文件的批处理大小 (3625)
    • 设置 S3 AK 为可选项 (3688)

06




下一版本

  • [优化]支持文件压缩(3699)

Apache SeaTunnel





Apache SeaTunnel 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台

仓库地址: 
https://github.com/apache/seatunnel

网址:
https://seatunnel.apache.org/

Apache SeaTunnel 下载地址:
https://seatunnel.apache.org/download
 
衷心欢迎更多人加入!

我们相信,在「Community Over Code」(社区大于代码)、「Open and Cooperation」(开放协作)、「Meritocracy」(精英管理)、以及「多样性与共识决策」等 The Apache Way 的指引下,我们将迎来更加多元化和包容的社区生态,共建开源精神带来的技术进步!

我们诚邀各位有志于让本土开源立足全球的伙伴加入 SeaTunnel 贡献者大家庭,一起共建开源!

提交问题和建议:
https://github.com/apache/seatunnel/issues

贡献代码:
https://github.com/apache/seatunnel/pulls

订阅社区开发邮件列表 : 
dev-subscribe@seatunnel.apache.org

开发邮件列表:
dev@seatunnel.apache.org

加入 Slack:
https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1kcxzyrxz-lKcF3BAyzHEmpcc4OSaCjQ

关注 Twitter: 
https://twitter.com/ASFSeaTunnel

精彩推荐




Apache SeaTunnel Connector 使用文档和使用案例有奖征稿来了!一起玩开源




Vertica Sink Connector 使用必看手册




讲师征集令 | Apache SeaTunnel Meetup 分享嘉宾火热招募中!



一键三连-点赞在看转发⭐️!

文章转载自SeaTunnel,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论