
点击蓝字 关注我们

01
支持引擎
Spark Flink SeaTunnel Zeta
02
主要功能
批处理 精确一次性
pollNext调用中读取分片中的所有数据。将读取的分片保存在快照中。
列投影 文件格式类型 文本 CSV Parquet ORC JSON Excel
03
描述
04
支持的源数据信息
| 数据源 | 支持的版本 |
05
依赖关系
如果您使用 Spark/Flink,请确保您的 Spark/Flink 集群已经集成了 Hadoop。已测试的 Hadoop 版本是 2.x。
如果您使用 SeaTunnel Zeta,则在下载和安装 SeaTunnel Zeta 时会自动集成 Hadoop jar 文件。您可以在 ${SEATUNNEL_HOME}/lib
目录下查看这些 jar 文件。要使用此连接器,您需要将hadoop-aws-3.1.4.jar
和aws-java-sdk-bundle-1.11.271.jar
放入${SEATUNNEL_HOME}/lib
目录。
06
数据类型映射
文本 CSV Parquet ORC JSON Excel
JSON 文件类型
{"code": 200, "data": "get success", "success": true}
{"code": 200, "data": "get success", "success": true}
{"code": 300, "data": "get failed", "success": false}
schema {
fields {
code = int
data = string
success = boolean
}
}
| Code | Data | Success |
文本或 CSV 文件类型
tyrantlucifer#26#male
| Content |
delimiter。
delimiter = "#"
schema {
fields {
name = string
age = int
gender = string
}
}
| 姓名 | 年龄 | 性别 |
Orc 文件类型
| Orc数据类型 | SeaTunnel数据类型type |
VARCHAR CHAR | |
Parquet文件类型
parquetorc, 则不需要配置 schema 选项,连接器可以自动找到上游数据的模式。
| Orc数据类型 | SeaTunnel数据类型 |
DECIMAL | |
07
选项
| 名称 | 类型 | 必填 | 默认值 | 描述 |
textcsvparquetorcjsonexcel。 | ||||
s3n://seatunnel-test,如果您使用 s3a协议,此参数应为 s3a://seatunnel-test。 | ||||
org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider和 com.amazonaws.auth.InstanceProfileCredentialsProvider。有关凭证提供程序的更多信息,您可以查看 Hadoop AWS 文档。 | ||||
textcsvparquetorcjsonexcel。如果用户在读取 textjsoncsv文件时想要使用此功能,必须配置 "schema" 选项。 | ||||
fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider时使用。 | ||||
fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider时使用。 | ||||
\001,与 Hive 的默认分隔符相同。 | ||||
s3n://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26读取文件。文件中的每条记录数据将添加这两个字段:name="tyrantlucifer",age=16。 | ||||
yyyy-MM-ddyyyy.MM.ddyyyy/MM/dd。默认为 yyyy-MM-dd。 | ||||
yyyy-MM-dd HH:mm:ssyyyy.MM.dd HH:mm:ssyyyy/MM/dd HH:mm:ssyyyyMMddHHmmss。 | ||||
HH:mm:ssHH:mm:ss.SSS。 | ||||
skip_header_row_number = 2。然后 SeaTunnel 将从源文件中跳过前两行。 | ||||
08
示例
在此示例中,我们从 S3 路径 s3a://seatunnel-test/seatunnel/text
中读取数据,文件类型为 ORC。我们使用org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
进行身份验证,因此需要提供access_key
和secret_key
。文件中的所有列都将被读取并发送到接收器。
# 定义运行时环境
env {
# 您可以在这里设置 Flink 配置
execution.parallelism = 1
job.mode = "BATCH"
}
source {
S3File {
path = "/seatunnel/text"
fs.s3a.endpoint = "s3.cn-north-1.amazonaws.com.cn"
fs.s3a.aws.credentials.provider = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
access_key = "xxxxxxxxxxxxxxxxx"
secret_key = "xxxxxxxxxxxxxxxxx"
bucket = "s3a://seatunnel-test"
file_format_type = "orc"
}
}
transform {
# 如果您想要获取有关如何配置 seatunnel 和查看变换插件的完整列表的更多信息,请转至 [seatunnel.apache.org/docs/category/transform-v2](https://seatunnel.apache.org/docs/category/transform-v2)。
}
sink {
jdbc {
url = "jdbc:mysql://localhost:3306/test"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "123456"
# 基于数据库表名自动生成 SQL 语句
generate_sink_sql = true
database = test
table = test_table
}
}
使用 InstanceProfileCredentialsProvider
进行身份验证,文件类型为 JSON,因此需要配置 schema 选项。
source {
S3File {
path = "/seatunnel/json"
bucket = "s3a://seatunnel-test"
fs.s3a.endpoint = "s3.cn-north-1.amazonaws.com.cn"
fs.s3a.aws.credentials.provider = "com.amazonaws.auth.InstanceProfileCredentialsProvider"
file_format_type = "json"
schema {
fields {
id = int
name = string
}
}
}
}
使用 InstanceProfileCredentialsProvider
进行身份验证,文件类型为 JSON,具有五个字段 (id
,name
,age
,sex
,type
), 因此需要配置 schema 选项。在此作业中,我们只需将id
和name
列发送到 MySQL。
env {
# 您可以在此处设置 Flink 配置
execution.parallelism = 1
job.mode = "BATCH"
}
source {
S3File {
path = "/seatunnel/json"
bucket = "s3a://seatunnel-test"
fs.s3a.endpoint = "s3.cn-north-1.amazonaws.com.cn"
fs.s3a.aws.credentials.provider = "com.amazonaws.auth.InstanceProfileCredentialsProvider"
file_format_type = "json"
read_columns = ["id", "name"]
schema {
fields {
id = int
name = string
age = int
sex = int
type = string
}
}
}
}
transform {
# 如果您想要获取有关如何配置 seatunnel 和查看变换插件的完整列表的更多信息,请转至 [seatunnel.apache.org/docs/category/transform-v2](https://seatunnel.apache.org/docs/category/transform-v2)。
}
sink {
jdbc {
url = "jdbc:mysql://localhost:3306/test"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "123456"
# 基于数据库表名自动生成 SQL 语句
generate_sink_sql = true
database = test
table = test_table
}
}
09
更新日志
2.3.0-beta 2022-10-20
添加了 S3File 源连接器
下一版本
[功能] 支持 S3A 协议(3632) 允许用户添加其他 hadoop-s3 参数 允许使用 S3A 协议 解耦 hadoop-aws 依赖关系 [功能] 将 S3 AK 设置为可选项 (3688)
新手入门

最佳实践

测试报告
Apache SeaTunnel
文章转载自SeaTunnel,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




