暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

使用Apache SeaTunnel高效集成和管理SftpFil数据源

SeaTunnel 2024-09-19
297

点击蓝字

关注我们

本文为Apache SeaTunnel已经支持的SftpFile Source Connector使用文档,旨在帮助读者理解如何高效地使用SFTP文件源连接器,以便轻松地使用Apache SeaTunnel集成和管理您的SftpFil数据源。
SftpFile 是指通过 SFTP(Secure File Transfer Protocol)协议进行文件操作的对象或组件。在网络编程和数据集成中,SFTPFile 通常用来表示和操作存储在远程 SFTP 服务器上的文件。SFTP 是一种安全的文件传输协议,基于 SSH(Secure Shell)协议,提供了加密的文件传输和远程文件操作功能。




1



支持的引擎


SparkFlinkSeaTunnel Zeta




2



主要特性


支持功能和数据类型:

  • 批处理
  •  列投影
  •  并行处理
  •  文件格式类型
    •  文本
    •  CSV
    •  JSON
    •  Excel



3



描述


从 SFTP 文件服务器读取数据。




4



支持的数据源信息

使用 SftpFile 连接器,需要以下依赖项。可以通过 install-plugin.sh
 下载,也可以从 Maven 中央仓库获取。
数据源支持的版本依赖项
SftpFile
通用
下载(https://mvnrepository.com/artifact/org.apache.seatunnel/connector-file-sftp))
  • 提示
如果你使用的是 Spark/Flink,请确保 Spark/Flink 集群已经集成了 Hadoop。Hadoop 2.x 版本已通过测试。
如果使用 SeaTunnel 引擎,安装 SeaTunnel 引擎时会自动集成 Hadoop JAR 包。可以在 ${SEATUNNEL_HOME}/lib
 目录下检查这个 JAR 包是否存在。
为了支持更多的文件类型,我们做了一些妥协,所以在内部访问 Sftp 时我们使用了 HDFS 协议,这个连接器需要一些 Hadoop 依赖项,且仅支持 Hadoop 版2.9.X+ 版本。



5



数据类型映射


文件没有特定的类型列表,我们可以通过在配置中指定模式来指示要将哪个 SeaTunnel 数据类型转换为相应的数据。
SeaTunnel数据类型
STRING
SHORT
INT
BIGINT
BOOLEAN
DOUBLE
DECIMAL
FLOAT
DATE
TIME
TIMESTAMP
BYTES
ARRAY
MAP




6



Source选项


名称类型
必填
默认值
描述
host
字符串
-
目标 SFTP 主机地址
port
整数
-
目标 SFTP 端口号
user
字符串
-
目标 SFTP 用户名
password
字符串
-
目标 SFTP 密码
path
字符串
-
源文件路径
file_format_type
字符串
-
请查看下文的 #file_format_type
file_filter_pattern
字符串
-
用于文件过滤的过滤器模式。
delimiter
字符串
\001
字段分隔符,用于告诉连接器如何在读取文本文件时切割字段。默认 \001
,与 Hive 的默认分隔符相同。
parse_partition_from_path
布尔型
true
控制是否从文件路径中解析分区键和值。例如,如果从路径中读取文件 oss://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26
那么文件中的每条记录将添加这两个字段:name agetyrantlucifer 26提示:不要在模式选项中定义分区字段
date_format
字符串
yyyy-MM-dd
日期类型的格式,用于告诉连接器如何将字符串转换为日期,支持以下格式:yyyy-MM-dd
 yyyy.MM.dd
 yyyy/MM/dd
,默认 yyyy-MM-dd
datetime_format
字符串
yyyy-MM-dd HH:mm:ss
日期时间类型的格式,用于告诉连接器如何将字符串转换为日期时间,支持以下格式:yyyy-MM-dd HH:mm:ss
 yyyy.MM.dd HH:mm:ss
 yyyy/MM/dd HH:mm:ss
 yyyyMMddHHmmss
,默认 yyyy-MM-dd HH:mm:ss
time_format
字符串
HH:mm:ss
时间类型的格式,用于告诉连接器如何将字符串转换为时间,支持以下格式:HH:mm:ss
 HH:mm:ss.SSS
,默认 HH:mm:ss
skip_header_row_number
长整型
0
跳过前几行,仅对 txt 和 csv 文件有效。例如,设置如下:skip_header_row_number = 2
那么 SeaTunnel 将跳过源文件的前两行。
sheet_name
字符串
-
读取工作簿的工作表名称,仅在文件格式为 Excel 时使用。
schema
配置项
-
请查看下文的 #schema
通用选项

-
Source 插件通用参数,请参考 Source通用选项 获取详细信息。

file_format_type [字符串]

支持以下文件类型:text
 csv
 parquet
 orc
 json
 excel
如果将文件类型指定为 json
,需要配置 Schema 模式选项,向连接器说明如何解析数据为你需要所需的 Row。例如:上游数据如下:
{"code": 200, "data": "get success", "success": true}

也可以将多个数据保存在一个文件中,并通过换行符进行分隔:
{"code": 200, "data": "get success", "success": true}
{"code": 300, "data": "get failed", "success": false}

需要按照以下方式配置 Schema:
schema {
fields {
code = int
data = string
success = boolean
}
}

连接器将生成以下数据:
CodeData
Success
200
获取成功
true
如果将文件类型指定为 parquet
 或 orc
,则无需指定模式选项,连接器可以自动查找上游数据的模式。


如果将文件类型指定为 text
 或 csv
,则可以选择是否指定模式信息或不指定。


例如,上游数据如下:
tyrantlucifer#26#male

如果不配置 Schema,Connector 将这样处理上游数据:
如果分配数据模式,则除了 CSV 文件类型外,还应分配选项 delimiter。
内容
tyrantlucifer#26#male
如果配置了数据 Schema,除了CSV文件类型,还需要配置选项分隔符。
需要配置 Schema 和分隔符如下:
delimiter = "#"
schema {
fields {
name = string
age = int
gender = string
}
}

连接器将生成以下数据:
姓名年龄
性别
tyrantlucifer
26

Schema [配置项]

fields [配置项]上游数据的 Schema。




7



如何创建 Sftp 数据同步任务


以下示例演示了如何创建一个数据同步任务,从 Sftp 读取数据并在本地客户端上打印出来:
# 设置要执行的任务的基本配置
env {
execution.parallelism = 1
job.mode = "BATCH"
}

# 创建连接到 Sftp 的源
source {
SftpFile {
host = "sftp"
port = 22
user = seatunnel
password = pass
path = "tmp/seatunnel/read/json"
file_format_type = "json"
result_table_name = "sftp"
schema = {
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(38, 18)"
c_timestamp = timestamp
c_row = {
C_MAP = "map<string, string>"
C_ARRAY = "array<int>"
C_STRING = string
C_BOOLEAN = boolean
C_TINYINT = tinyint
C_SMALLINT = smallint
C_INT = int
C_BIGINT = bigint
C_FLOAT = float
C_DOUBLE = double
C_BYTES = bytes
C_DATE = date
C_DECIMAL = "decimal(38, 18)"
C_TIMESTAMP = timestamp
}
}
}
}
}

# 控制台打印读取的 Sftp 数据
sink {
Console {
parallelism = 1
}
}


活动推荐


Apache SeaTunnel Meetup | 9月24日



Apache SeaTunnel





Apache SeaTunnel 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台

仓库地址: 
https://github.com/apache/seatunnel

网址:
https://seatunnel.apache.org/

Apache SeaTunnel 下载地址:
https://seatunnel.apache.org/download
 
衷心欢迎更多人加入!

我们相信,在「Community Over Code」(社区大于代码)、「Open and Cooperation」(开放协作)、「Meritocracy」(精英管理)、以及「多样性与共识决策」等 The Apache Way 的指引下,我们将迎来更加多元化和包容的社区生态,共建开源精神带来的技术进步!


我们诚邀各位有志于让本土开源立足全球的伙伴加入 SeaTunnel 贡献者大家庭,一起共建开源!

提交问题和建议:
https://github.com/apache/seatunnel/issues

贡献代码:
https://github.com/apache/seatunnel/pulls

订阅社区开发邮件列表 : 
dev-subscribe@seatunnel.apache.org

开发邮件列表:
dev@seatunnel.apache.org

加入 Slack:
https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1kcxzyrxz-lKcF3BAyzHEmpcc4OSaCjQ

关注 Twitter: 
https://twitter.com/ASFSeaTunnel

精彩推荐




Apache SeaTunnel Connector 使用文档和使用案例有奖征稿来了!一起玩开源




融入大数据技术革新的洪流!2024 SeaTunnel Meetup 讲师招募开启




Apache SeaTunnel 入选 2024 开源数据工程生态系统全景图!



点击阅读原文了解更多⭐️!

文章转载自SeaTunnel,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论