暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flinksql CDC 集成hudi

大数据启示录 2022-02-15
212


1: 环境准备

# 版本
flinkғflink-1.12.2
hudiғ0.9
git clone https://github.com/apache/hudi.git && cd hudi
#自动同步hive
mvn clean package -DskipTests -Dscala-2.12 -Pflink-bundle-shade-hive2
// 如果是hive3 需要使用 profile -Pflink-bundle-shade-hive3
// 如果是hive1 需要使用 profile -Pflink-bundle-shade-hive1
所需的包:hudi-flink_2.12-0.9.0-SNAPSHOT.jar 、hudi-hadoop-mr-0.9.0-
SNAPSHOT.jar


目前都已经打包完成,在相关目录下,hudi-flink_2.12-
0.9.0-SNAPSHOT.jar包放置到flink/lib下,hudi-hadoop-mr-0.9.0-SNAPSHOT.jar
放置到ک/opt/cloudera/parcels/CDH/lib/hive/auxlib 目录下

2:基础案例

#
./bin/yarn-session.sh -d -nm hudi_wm ᬬࢧጱid
./bin/sql-client.sh embedded -s application_1625359036842_0992
# 建表1
CREATE TABLE hudi_test (
id int,
name varchar(6),
ts timestamp(3),
PRIMARY KEY (id) NOT ENFORCED
)
WITH (
'connector' = 'hudi',
'table.type' = 'COPY_ON_WRITE',
'path' = 'hdfs://sd-cluster-03:8020/data/hudi/hudi_test',
'read.streaming.enabled' = 'true',
'read.streaming.check-interval' = '1' ,
'compaction.trigger.strategy'='num_commits',
'compaction.delta_commits' ='5'
);
# 建表2 默认是自动创建hive 表,经验证并没有
CREATE TABLE hudi_test (
id int,
name varchar(6),
ts timestamp(3),
PRIMARY KEY (id) NOT ENFORCED
)
WITH (
'connector'='hudi',
 'path' = 'hdfs://xxx:8020/data/hudi/hudi_test',
'table.type'='COPY_ON_WRITE',
'hive_sync.enable'='true',
'hive_sync.metastore.uris' = 'thrift://xxx:9083',
'hive_sync.jdbc_url'='jdbc:hive2://xxx:10000',
'hive_sync.table'='hudi_test',
'hive_sync.db'='ods',
'hive_sync.partition_extractor_class'='org.apache.hudi.hive.MultiPartKeysValueExt
ractor'
);
# 官方模版
CREATE TABLE t1(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
with(
'connector'='hudi',
'path' = 'hdfs://xxx.xxx.xxx.xxx:9000/t1',
 'table.type'='COPY_ON_WRITE'// merge_on_read方式在没有生成parquet之前,hive不会有文件输出
 'hive_sync.enable'='true'// 开启hive同步功能
 'hive_sync.metastore.uris' = 'thrift://ip:9083'//metastore端口
'hive_sync.jdbc_url'='jdbc:hive2://ip:10000', // hiveServer端口
 'hive_sync.table'='t1',        // hive 新建的表名
 'hive_sync.db'='testDB',       // 库名
'hive_sync.username'='root',
 'hive_sync.password'='your password' 
'hive_sync.partition_extractor_class'='org.apache.hudi.hive.MultiPartKeysValueExt
ractor'
);


# 插入数据
INSERT INTO hudi_test VALUES
(1,'sy',TIMESTAMP '1970-01-01 00:00:01'),
(2,'wm',TIMESTAMP '1970-01-01 00:00:01'),
(3,'lzc',TIMESTAMP '1970-01-01 00:00:01'),
(4,'lc',TIMESTAMP '1970-01-01 00:00:01'),
(5,'txy',TIMESTAMP '1970-01-01 00:00:01'),
(6,'mdz',TIMESTAMP '1970-01-01 00:00:01')
#ັ፡查看hdfs 目录
hadoop fs -ls hdfs://xxx:8020/data/hudi/hudi_test

2:

2:双流join

# mysql 建表
CREATE TABLE `hudi_users` (
`id` int NOT NULL AUTO_INCREMENT,
`name` varchar(6) NOT NULL DEFAULT '',
`ts` timestamp default CURRENT_TIMESTAMP not null,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
insert into hudi_users (name) values ('sy');
insert into hudi_users (name) values ('wm');
insert into hudi_users (name) values ('lzc');
insert into hudi_users (name) values ('lc');
insert into hudi_users (name) values ('txy');
insert into hudi_users (name) values ('mdz');
GRANT ALL PRIVILEGES ON *.* TO 'cdh'@'%' WITH GRANT OPTION;

Source

#flinkSql 建表急构建数据流
CREATE TABLE hudi_users_mysql (
id int,
name varchar(6),
ts timestamp(3),
PRIMARY KEY (id) NOT ENFORCED
)
WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.1.215',
'port' = '3306',
'username' = 'cdh',
'password' = '123456',
'database-name' = 'test',
'table-name' = 'hudi_users'
);
#kafka
CREATE TABLE hudi_users_kafka (
id int,
name varchar(6),
ts timestamp(3),
PRIMARY KEY (id) NOT ENFORCED
)
WITH (
'connector' = 'kafka',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'sd-kafka001:9092,sd-kafka002:9092,sdkafka003:9092',
'format' = 'debezium-json',
'topic' = 'hudi_users'
);
# 构建数据流
set table.dynamic-table-options.enabled= true;
INSERT INTO hudi_users_kafka SELECT * FROM hudi_users_mysql /*+ OPTIONS('serverid'='789') */;


# kafka 消费数据
kafka-console-consumer.sh --bootstrap-server ds1:9092 --topic hudi_users --frombeginning

Sink

# 开启checkpoint
官方配置
### set the interval as 5 minutes
execution.checkpointing.interval: 5000
state.backend: filesystem
### state.backend: rocksdb
### state.backend.incremental: true
state.checkpoints.dir: hdfs://sd-cluster-03:8020/data/flink_checkpoints
state.savepoints.dir: hdfs://sd-cluster-03:8020/data/flink_savepoints
测试配置:10s
execution.checkpointing.interval: 10000
state.backend: filesystem
state.checkpoints.dir: hdfs://sd-cluster-03:8020/data/flink_checkpoints
state.savepoints.dir: hdfs://sd-cluster-03:8020/data/flink_savepoints
Flink 5次checkpoint (默认配置可修改)之后,才能生成一个parquet文件,一开始只有一个.hoodie文件夹。

#sink MERGE_ON_READ/COPY_ON_WRITE
CREATE TABLE hudi_users_hdfs (
id int,
name varchar(6),
ts timestamp(3),
PRIMARY KEY (id) NOT ENFORCED
)
WITH (
'connector' = 'hudi',
'table.type' = 'COPY_ON_WRITE',
'path' = 'hdfs://sd-cluster-03:8020/data/hudi/hudi_users',
'read.streaming.enabled' = 'true',
'read.streaming.check-interval' = '1'
);
# 构建数据流
INSERT INTO hudi_users_hdfs SELECT * FROM hudi_users_kafka ;
# hive 外表
CREATE EXTERNAL TABLE `ods.hudi_users`(
`_hoodie_commit_time` string,
`_hoodie_commit_seqno` string,
`_hoodie_record_key` string,
`_hoodie_partition_path` string,
`_hoodie_file_name` string,
`id` bigint,
`name` string,
`ts` bigint)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hudi.hadoop.HoodieParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
'hdfs://sd-cluster-03:8020/data/hudi/hudi_users';
#ັ 查询
select id,name from ods.hudi_users;

mysql 进行update,delete,insert操作,观察hudi的变化
update hudi_users set name='sy1' where id=1;

hudi 实时变化

insert into hudi_users (name) values ('lmh');

hudi 实时变化

delete from hudi_users where name='lmh';

hudi实时变化

# 统计查询
set hive.input.format = org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
select id,name from ods.hudi_users where name='sy'

文章转载自大数据启示录,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论