关注我们
版本信息:
SeaTunnel 2.3.3
InfluxDB 2.7.6
Doris 2.1.3 rc09
1
准备事项
connector-hudi-2.3.3.jar和
connector-datahub-2.3.3.jar.
seatunnel-api-2.3.3.jar,
seatunnel-transforms-v2-2.3.3.jar,
mysql-connector-java-8.0.28.jar,
jersey-client-1.19.4.jar,这四个jar包必须添加,不然无法同步数据运行同步脚本直接报错没有某个类。
2
同步过程及踩坑点
influx v1 auth create -o orgName --read-bucket bucketId --username=username,
或者:
influx v1 auth create -o "组织名称" --write-bucket bucketId(桶id,不需要引号) --read-bucket bucketId(桶id,不需要引号) --username=账号 --password=密码influx v1 auth delete --id 'id编码'influx v1 auth list命令查出来的ID,下图所示:

env {
execution.parallelism = 1
job.mode = "BATCH"
checkpoint.interval = 10000
}
source {
influxdb {
url = "http://X.X.X.X:8086"
token = "写自己的token" #可有可无
org = "自己的组织名称"
bucket = "自己的桶" #可有可无
database = "自己的桶"
username = "写在第四步自己新建的influxdb账号"
password = "写在第四步自己新建的influxdb密码"
epoch = "H" #这个有好几级,可以去官网查看
query_timeout_sec = 600
measurement = "prometheus_remote_write" #数据表
fields = ["node_cpu_seconds_total", "node_memory_MemTotal_bytes"] #可有可无,配置自己的字段
sql = """SELECT node_cpu_seconds_total as system_cpu_usage,cpu as process_occupy_physical_memory_size,job as create_dept,node_memory_MemTotal_bytes as process_read_written_file_system_total_bytes,node_memory_MemAvailable_bytes as process_open_file_describe_quantity,time as create_time FROM "prometheus_remote_write" where time > now() - 1h"""
where = " where time > now() - 1h"
#经过本人测试。上面的sql查询的字段必须经过重命名,或者doris建表的字段必须和influxdb2的字段完全一致,不然transform 中进行转换的时候就会成为空值,这个我还没研究明白为什么,研究明白了在补上说明,doris的表字段类型也必须和influxdb2中查询的字段类型一致,不然数据存不到doris中。schema 重定义的事influxdb2查到的字段和类型
schema {
fields {
#node_cpu_seconds_total = FLOAT
system_cpu_usage = FLOAT
process_occupy_physical_memory_size = INT
create_dept = STRING
process_read_written_file_system_total_bytes = FLOAT
process_open_file_describe_quantity = FLOAT
create_time = BIGINT
}
}
}
}
sink {
Doris {
fenodes = "X.X.X.X:8030"
username = "账号"
password = "密码"
table.identifier = "sbyw_data_acquisition.sbyw_application_process_type_tmp"
sink.label-prefix = "test-cdc"
sink.enable-2pc = "true"
sink.enable-delete = "true"
sink.max-retries = 3
batch_size = 10000
result_table_name = "sbyw_application_process_type_tmp"
doris.config {
format = "json"
read_json_by_line = "true"
}
}
}
transform {
FieldMapper {
source_table_name = "prometheus_remote_write"
result_table_name = "sbyw_application_process_type_tmp"
field_mapper = {
#node_cpu_seconds_total = system_cpu_usage
system_cpu_usage = system_cpu_usage
process_occupy_physical_memory_size = process_occupy_physical_memory_size
process_read_written_file_system_total_bytes = process_read_written_file_system_total_bytes
process_open_file_describe_quantity = process_open_file_describe_quantity
create_time = create_time
create_dept = create_dept
}
}
}
./bin/seatunnel.sh -c ./config/v1.batch.config_tmp.template





活动推荐
Apache SeaTunnel
精彩推荐
点击阅读原文了解更多⭐️!

文章转载自SeaTunnel,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




