
@
点击蓝字
关注我们

S3File 是一个用于管理 Amazon S3(Simple Storage Service)的 Python 模块。当前,Apache SeaTunnel 已经支持 S3File Sink Connector,为了更好地使用这个 Connector,有必要看一下这篇使用文档指南。
01
描述
${SEATUNNEL_HOME}/lib目录下确认这个 JAR 包是否存在。
02
主要特性
03
选项
| 选项 | 类型 | 必需 | 默认值 | 备注 |
path [string]
bucket [string]
s3n://seatunnel-test,如果您使用的是
s3a协议,此参数应为
s3a://seatunnel-test。
fs.s3a.endpoint [string]
fs.s3a.aws.credentials.provider [string]
org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider和
com.amazonaws.auth.InstanceProfileCredentialsProvider。
access_key [string]
access_secret [string]
hadoop_s3_properties [map]
hadoop_s3_properties {
"fs.s3a.buffer.dir" = "/data/st_test/s3a"
"fs.s3a.fast.upload.buffer" = "disk"
}
custom_filename [boolean]
file_name_expression [string]
custom_filename为
true时使用
file_name_expression描述了将创建到
path中的文件表达式。我们可以在
file_name_expression中添加变量
${now} 或
${uuid},例如
test_${uuid}_${now},
${now}代表当前时间,其格式可以通过指定选项
filename_time_format来定义。
is_enable_transaction为
true,我们将在文件名的开头自动添加
${transactionId}_。
filename_time_format [string]
custom_filename为
true时使用
file_name_expression参数中的格式为
xxxx-${now}时,
filename_time_format可以指定路径的时间格式,默认值为
yyyy.MM.dd。常用的时间格式列于下表中:
| 符号 | 描述 |
file_format_type [string]
文本 (text) JSON CSV ORC Parquet Excel
txt。
field_delimiter [string]
file_format为 text 时需要。
row_delimiter [string]
file_format为 text 时需要。
have_partition [boolean]
partition_by [array]
have_partition为
true时使用。
partition_dir_expression [string]
have_partition为
true时使用。
partition_by,我们将根据分区信息生成相应的分区目录,并将最终文件放在分区目录中。
partition_dir_expression是
${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/。
k0是第一个分区字段,
v0是第一个分区字段的值。
is_partition_field_write_in_file [boolean]
have_partition为
true时使用。
is_partition_field_write_in_file为
true,分区字段及其值将写入数据文件中。
false。
sink_columns [array]
字段的顺序决定了实际写入文件的顺序。
is_enable_transaction [boolean]
is_enable_transaction为 true,我们将确保在写入目标目录时数据不会丢失或重复。
is_enable_transaction为
true,我们将在文件头部自动添加
${transactionId}_。
true。
batch_size [int]
batch_size和
checkpoint.interval共同决定。如果
checkpoint.interval的值足够大,当文件中的行数大于
batch_size时,写入器将写入文件。如果
checkpoint.interval较小,则在新的检查点触发时,写入器将创建一个新文件。
compress_codec [string]
txt: lzo
noneJSON: lzo
noneCSV: lzo
noneORC: lzo
snappy
lz4
zlib
noneParquet: lzo
snappy
lz4
gzip
brotli
zstd
none
常见选项
max_rows_in_memory [int]
sheet_name [string]
044
示例
have_partition、
custom_filename、
sink_columns和
com.amazonaws.auth.InstanceProfileCredentialsProvider的配置示例:
S3File {
bucket = "s3a://seatunnel-test"
tmp_path = "/tmp/seatunnel"
path="/seatunnel/text"
fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
file_format_type = "text"
field_delimiter = "\t"
row_delimiter = "\n"
have_partition = true
partition_by = ["age"]
partition_dir_expression = "${k0}=${v0}"
is_partition_field_write_in_file = true
custom_filename = true
file_name_expression = "${transactionId}_${now}"
filename_time_format = "yyyy.MM.dd"
sink_columns = ["name","age"]
is_enable_transaction=true
hadoop_s3_properties {
"fs.s3a.buffer.dir" = "/data/st_test/s3a"
"fs.s3a.fast.upload.buffer" = "disk"
}
}
org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider进行配置:
S3File {
bucket = "s3a://seatunnel-test"
tmp_path = "/tmp/seatunnel"
path="/seatunnel/parquet"
fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
access_key = "xxxxxxxxxxxxxxxxx"
secret_key = "xxxxxxxxxxxxxxxxx"
file_format_type = "parquet"
hadoop_s3_properties {
"fs.s3a.buffer.dir" = "/data/st_test/s3a"
"fs.s3a.fast.upload.buffer" = "disk"
}
}
org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider:
S3File {
bucket = "s3a://seatunnel-test"
tmp_path = "/tmp/seatunnel"
path="/seatunnel/orc"
fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
access_key = "xxxxxxxxxxxxxxxxx"
secret_key = "xxxxxxxxxxxxxxxxx"
file_format_type = "orc"
}
05
更新日志
2.3.0-beta 2022-10-20
添加 S3File Sink 连接器
2.3.0 2022-12-30
Bug修复 当上游字段为空时会抛出 NullPointerException Sink 列映射失败 从状态中恢复写入器时直接获取事务失败 (3258) 修复了以下导致数据写入文件失败的错误: 功能 允许用户添加额外的 Hadoop-S3 参数 允许使用 S3A 协议 解耦 Hadoop-AWS 依赖 支持 S3A 协议 (3632) 支持设置每个文件的批处理大小 (3625) 设置 S3 AK 为可选项 (3688)
06
下一版本
[优化]支持文件压缩(3699)
Apache SeaTunnel
精彩推荐
一键三连-点赞在看转发⭐️!

文章转载自SeaTunnel,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




