暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

AWS S3 文件系统如何在数分钟内导入到自己的数据处理流程中?

SeaTunnel 2024-03-26
792

点击蓝字 关注我们


本文档介绍了 S3File Source Connector,它是用于从 AWS S3 文件系统读取数据的连接器,已经在 Apache SeaTunnel 中得到支持。该连接器支持多种数据引擎,包括 Spark、Flink 和 SeaTunnel Zeta,并提供了丰富的功能集,如批处理、精确一次性处理、列投影等。如果你正好需要将数据从 AWS S3 文件系统导入到自己的数据处理流程中,本文档涵盖了连接器的关键特性、支持的数据源信息、依赖关系、数据类型映射以及详细的配置选项,带你了解如何配置 S3File Source 连接器以读取不同文件类型的数据,并在不同引擎上运行。

01




支持引擎

  • Spark
  • Flink
  • SeaTunnel Zeta


02




主要功能

  • 批处理 
  • 精确一次性
在 pollNext
 调用中读取分片中的所有数据。将读取的分片保存在快照中。
  •  列投影
  • 文件格式类型
    • 文本
    •  CSV
    •  Parquet
    •  ORC
    •  JSON
    •  Excel

03




描述

从 AWS S3 文件系统读取数据。

04




支持的源数据信息

数据源
支持的版本
S3
当前


05




依赖关系

如果您使用 Spark/Flink,请确保您的 Spark/Flink 集群已经集成了 Hadoop。已测试的 Hadoop 版本是 2.x。
如果您使用 SeaTunnel Zeta,则在下载和安装 SeaTunnel Zeta 时会自动集成 Hadoop jar 文件。您可以在 ${SEATUNNEL_HOME}/lib
 目录下查看这些 jar 文件。要使用此连接器,您需要将 hadoop-aws-3.1.4.jar
 和 aws-java-sdk-bundle-1.11.271.jar
放入 ${SEATUNNEL_HOME}/lib
 目录。


06




数据类型映射

数据类型映射与正在读取的文件类型相关。我们支持以下文件类型:
  • 文本
  • CSV
  • Parquet
  • ORC
  • JSON
  • Excel

JSON 文件类型

如果您将文件类型指定为 JSON,则还需要指定 schema 选项,以告诉连接器如何解析数据以生成所需的行。
例如:
上游数据如下:
{"code": 200, "data": "get success", "success": true}
您还可以在同一个文件中保存多个数据片段,并通过换行符分隔它们:
{"code": 200, "data": "get success", "success": true}
{"code": 300, "data": "get failed", "success": false}


在这种情况下,您应该将 schema 配置如下:
schema {
fields {
code = int
data = string
success = boolean
}
}

连接器将生成以下数据:
Code
Data
Success
200
get success
true

文本或 CSV 文件类型

如果将文件类型指定为文本或 CSV,可以选择是否指定 schema 信息。
例如,上游数据如下:
tyrantlucifer#26#male
如果不指定数据模式,连接器将上游数据视为以下内容:
Content
tyrantlucifer#26#male
如果指定数据模式,除 CSV 文件类型外,还应指定选项 delimiter
您应该将 schema 和 delimiter 配置如下:
delimiter = "#"
schema {
fields {
name = string
age = int
gender = string
}
}

连接器将生成以下数据:
姓名
年龄
性别
tyrantlucifer
26
male

Orc 文件类型

如果将文件类型指定为 Parquet 或 ORC,则不需要配置 schema 选项,连接器可以自动找到上游数据的模式。
以下是 ORC 数据类型到 SeaTunnel 数据类型的映射:
Orc数据类型SeaTunnel数据类型type
BOOLEAN
BOOLEAN
INT
INT
BYTE
BYTE
SHORT
SHORT
LONG
LONG
FLOAT
FLOAT
DOUBLE
DOUBLE
BINARY
BINARY
STRING
VARCHAR
CHAR
STRING
DATE
LOCAL_DATE_TYPE
TIMESTAMP
LOCAL_DATE_TIME_TYPE
DECIMAL
DECIMAL
LIST(STRING)
STRING_ARRAY_TYPE
LIST(BOOLEAN)
BOOLEAN_ARRAY_TYPE
LIST(TINYINT)
BYTE_ARRAY_TYPE
LIST(SMALLINT)
SHORT_ARRAY_TYPE
LIST(INT)
INT_ARRAY_TYPE
LIST(BIGINT)
LONG_ARRAY_TYPE
LIST(FLOAT)
FLOAT_ARRAY_TYPE
LIST(DOUBLE)
DOUBLE_ARRAY_TYPE
Map<K,V>
MapType, This type of K and V will transform to SeaTunnel type
STRUCT
SeaTunnelRowType

Parquet文件类型

如果你将文件类型指定为 parquet
 orc
, 则不需要配置 schema 选项,连接器可以自动找到上游数据的模式。
Orc数据类型SeaTunnel数据类型
INT_8
BYTE
INT_16
SHORT
DATE
DATE
TIMESTAMP_MILLIS
TIMESTAMP
INT64
LONG
INT96
TIMESTAMP
BINARY
BYTES
FLOAT
FLOAT
DOUBLE
DOUBLE
BOOLEAN
BOOLEAN
FIXED_LEN_BYTE_ARRAY
TIMESTAMP
DECIMAL
DECIMAL
DECIMAL
LIST(STRING)
STRING_ARRAY_TYPE
LIST(BOOLEAN)
BOOLEAN_ARRAY_TYPE
LIST(TINYINT)
BYTE_ARRAY_TYPE
LIST(SMALLINT)
SHORT_ARRAY_TYPE
LIST(INT)
INT_ARRAY_TYPE
LIST(BIGINT)
LONG_ARRAY_TYPE
LIST(FLOAT)
FLOAT_ARRAY_TYPE
LIST(DOUBLE)
DOUBLE_ARRAY_TYPE
Map<K,V>
MapType, This type of K and V will transform to SeaTunnel type
STRUCT
SeaTunnelRowType



07




选项


名称
类型
必填
默认值
描述
path
string
-
需要读取的 S3 路径可以包含子路径,但子路径需要满足特定的格式要求。具体要求可以参考 "parse_partition_from_path" 选项。
file_format_type
string
-
文件类型,支持以下文件类型:text
 csv
 parquet
 orc
json
 excel
bucket
string
-
S3 文件系统的存储桶地址,例如:s3n://seatunnel-test
,如果您使用 s3a
 协议,此参数应为 s3a://seatunnel-test
fs.s3a.endpoint
string
-
fs s3a 终端节点。
fs.s3a.aws.credentials.provider
string
com.amazonaws.auth.InstanceProfileCredentialsProvider
用于身份验证 S3A 的方式。我们目前只支持 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
和 com.amazonaws.auth.InstanceProfileCredentialsProvider
。有关凭证提供程序的更多信息,您可以查看 Hadoop AWS 文档
read_columns
list
-
数据源的读取列列表,用户可以使用它来实现字段投影。支持的文件类型列投影如上所示:text
 csv
 parquet
 orc
 json
excel
。如果用户在读取 text
 json
 csv
 文件时想要使用此功能,必须配置 "schema" 选项。
access_key
string
-
仅在 fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
 时使用。
access_secret
string
-
仅在 fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
 时使用。
hadoop_s3_properties
map
-
如果需要添加其他选项,您可以在此处添加并参考此链接
delimiter
string
\001
字段分隔符,用于告诉连接器在读取文本文件时如何切分字段。默认为 \001
,与 Hive 的默认分隔符相同。
parse_partition_from_path
boolean
true
控制是否从文件路径中解析分区键和值。例如,如果您从路径 s3n://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26
读取文件。文件中的每条记录数据将添加这两个字段:name="tyrantlucifer",age=16。
date_format
string
yyyy-MM-dd
日期类型格式,用于告诉连接器如何将字符串转换为日期,支持以下格式:yyyy-MM-dd
 yyyy.MM.dd
 yyyy/MM/dd
。默认为 yyyy-MM-dd
datetime_format
string
yyyy-MM-dd HH:mm:ss
日期时间类型格式,用于告诉连接器如何将字符串转换为日期时间,支持以下格式:yyyy-MM-dd HH:mm:ss
 yyyy.MM.dd HH:mm:ss
 yyyy/MM/dd HH:mm:ss
 yyyyMMddHHmmss
time_format
string
HH:mm:ss
时间类型格式,用于告诉连接器如何将字符串转换为时间,支持以下格式:HH:mm:ss
 HH:mm:ss.SSS
skip_header_row_number
long
0
跳过前几行,但仅适用于 txt 和 csv 文件。例如,设置如下:skip_header_row_number = 2
。然后 SeaTunnel 将从源文件中跳过前两行。
schema
config
-
上游数据的模式。
common-options

-
源插件的通用参数,请参阅 源通用选项 以获取详细信息。
sheet_name
string
-
读取工作簿的表,仅在文件格式为 Excel 时使用。

08




示例

  1. 在此示例中,我们从 S3 路径 s3a://seatunnel-test/seatunnel/text
     中读取数据,文件类型为 ORC。我们使用 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
     进行身份验证,因此需要提供 access_key
     和 secret_key
    。文件中的所有列都将被读取并发送到接收器。
# 定义运行时环境
env {
# 您可以在这里设置 Flink 配置
execution.parallelism = 1
job.mode = "BATCH"
}

source {
S3File {
path = "/seatunnel/text"
fs.s3a.endpoint = "s3.cn-north-1.amazonaws.com.cn"
fs.s3a.aws.credentials.provider = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
access_key = "xxxxxxxxxxxxxxxxx"
secret_key = "xxxxxxxxxxxxxxxxx"
bucket = "s3a://seatunnel-test"
file_format_type = "orc"
}
}

transform {
# 如果您想要获取有关如何配置 seatunnel 和查看变换插件的完整列表的更多信息,请转至 [seatunnel.apache.org/docs/category/transform-v2](https://seatunnel.apache.org/docs/category/transform-v2)。
}

sink {
jdbc {
url = "jdbc:mysql://localhost:3306/test"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "123456"
# 基于数据库表名自动生成 SQL 语句
generate_sink_sql = true
database = test
table = test_table
}
}


  1. 使用 InstanceProfileCredentialsProvider
     进行身份验证,文件类型为 JSON,因此需要配置 schema 选项。
source {
S3File {
path = "/seatunnel/json"
bucket = "s3a://seatunnel-test"
fs.s3a.endpoint = "s3.cn-north-1.amazonaws.com.cn"
fs.s3a.aws.credentials.provider = "com.amazonaws.auth.InstanceProfileCredentialsProvider"
file_format_type = "json"
schema {
fields {
id = int
name = string
}
}
}
}

  1. 使用 InstanceProfileCredentialsProvider
     进行身份验证,文件类型为 JSON,具有五个字段 (id
    name
    age
    sex
    type
    ), 因此需要配置 schema 选项。在此作业中,我们只需将 id
     和name
     列发送到 MySQL。
env {
# 您可以在此处设置 Flink 配置
execution.parallelism = 1
job.mode = "BATCH"
}

source {
S3File {
path = "/seatunnel/json"
bucket = "s3a://seatunnel-test"
fs.s3a.endpoint = "s3.cn-north-1.amazonaws.com.cn"
fs.s3a.aws.credentials.provider = "com.amazonaws.auth.InstanceProfileCredentialsProvider"
file_format_type = "json"
read_columns = ["id", "name"]
schema {
fields {
id = int
name = string
age = int
sex = int
type = string
}
}
}
}

transform {
# 如果您想要获取有关如何配置 seatunnel 和查看变换插件的完整列表的更多信息,请转至 [seatunnel.apache.org/docs/category/transform-v2](https://seatunnel.apache.org/docs/category/transform-v2)。
}

sink {
jdbc {
url = "jdbc:mysql://localhost:3306/test"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "123456"
# 基于数据库表名自动生成 SQL 语句
generate_sink_sql = true
database = test
table = test_table
}
}


09




更新日志

2.3.0-beta 2022-10-20

  • 添加了 S3File 源连接器

下一版本

  • [功能] 支持 S3A 协议(3632)
    • 允许用户添加其他 hadoop-s3 参数
    • 允许使用 S3A 协议
    • 解耦 hadoop-aws 依赖关系
  • [功能] 将 S3 AK 设置为可选项 (3688)

新手入门

 SeaTunnel 让数据集成变得 So easy!  3 分钟入门指南
从 0 到 1 快速入门 Apache SeaTunnel 
初探 Apache SeaTunnel / 深入理解 Apache SeaTunnel

 MySQL 同步到 Hive / 从MySQL同步到StarRocks
通过 SeaTunnel 将数据写入 OSS-HDFS 
MySQL 到 Elasticsearch 实时同步解决方案

启动 SeaTunnel / 3 分钟部署 SeaTunnel Zeta 
 部署 Apache SeaTunnel 分布式集群
Apache SeaTunnel Web部署指南
基于Apache SeaTunnel构建CDC数据同步管道

最佳实践

 OPPO 清风 天翼云 马蜂窝
孩子王 哔哩哔哩 唯品会
众安保险

测试报告


 性能测试报告:SeaTunnel 批量同步数据比 GLUE 快 420%!
最新性能对比报告:SeaTunnel 是 Airbyte 30 倍!
比DataX快20%!SeaTunnel同步计算引擎性能测试全新发布
SeaTunnel 与 DataX 、Sqoop、Flume、Flink CDC 对比

Apache SeaTunnel





Apache SeaTunnel 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台


仓库地址: 
https://github.com/apache/seatunnel

网址:
https://seatunnel.apache.org/

Apache SeaTunnel 下载地址:
https://seatunnel.apache.org/download
 
衷心欢迎更多人加入!

我们相信,在「Community Over Code」(社区大于代码)、「Open and Cooperation」(开放协作)、「Meritocracy」(精英管理)、以及「多样性与共识决策」等 The Apache Way 的指引下,我们将迎来更加多元化和包容的社区生态,共建开源精神带来的技术进步!

我们诚邀各位有志于让本土开源立足全球的伙伴加入 SeaTunnel 贡献者大家庭,一起共建开源!

提交问题和建议:
https://github.com/apache/seatunnel/issues

贡献代码:
https://github.com/apache/seatunnel/pulls

订阅社区开发邮件列表 : 
dev-subscribe@seatunnel.apache.org

开发邮件列表:
dev@seatunnel.apache.org

加入 Slack:
https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1kcxzyrxz-lKcF3BAyzHEmpcc4OSaCjQ

关注 Twitter: 
https://twitter.com/ASFSeaTunnel

文章转载自SeaTunnel,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论