暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

flinkcdc 同步数据到 doris分析---完全验证ok

原创 途er 2022-12-13
4289

1.docker-compose参考下面用docker-up即可运行

version: '2.1'
services:

doris-fe:
image: registry.cn-qingdao.aliyuncs.com/dataease/doris:v1.1.0-0704
container_name: doris-fe
ports:
- 8030:8030
- 9030:9030
environment:
- DORIS_ROLE=fe-leader
- FE_ROLE=fe-leader
- DORIS_HOME=/opt/doris/fe
- DORIS_TEST=opt/doris/fe
volumes:
- d:/dataease/data/fe:/opt/doris/fe/doris-meta
- d:/dataease/logs/fe:/opt/doris/fe/log
- d:/dataease/conf/fe.conf:/opt/doris/fe/conf/fe.conf
- d:/dataease/doris-meta:/home/doris-meta
networks:
dataease-network :
ipv4_address: 192.168.43.198
restart: always
depends_on:
- doris-be
#######fe-leader#################


# condition: service_healthy
# healthcheck:
## test: [ "CMD-SHELL","curl -sS 127.0.0.1:8030 || exit 1" ]
# interval: 10s
doris-be:
image: registry.cn-qingdao.aliyuncs.com/dataease/doris:v1.1.0-0704
container_name: doris-be
ports:
- 9050:9050
- 9060:9060
- 9070:9070
- 8040:8040
environment:
- DORIS_ROLE=be
- DORIS_HOME=/opt/doris/be
- DORIS_TEST=opt/doris/be
volumes:
- d:/dataease/data/be:/opt/doris/be/storage
- d:/dataease/logs/be:/opt/doris/be/log
- d:/dataease/conf/be.conf:/opt/doris/be/conf/be.conf
- d:/dataease/data/be/storage:/opt/doris/be/storage
# depends_on:
# - doris-fe
networks:
dataease-network :
ipv4_address: 192.168.43.199
restart: always
# kettle:
# image: registry.cn-qingdao.aliyuncs.com/dataease/kettle:8.3-v1.2
# container_name: kettle
# ports:
# - 18080:18080
# volumes:
# - d:/dataease/conf/:d:/dataease/conf
# - d:/dataease/data/kettle:d:/dataease/data/kettle
# networks:
# - dataease-network
# restart: always
################### flink -mysql -es #######################################
sql-client:
user: flink:flink
image: yuxialuo/flink-sql-client:1.13.2.v1

depends_on:
- jobmanager
- mysql
privileged: true
environment:
FLINK_JOBMANAGER_HOST: jobmanager
MYSQL_HOST: mysql
volumes:
# - shared-tmpfs:/tmp/iceberg
- D:/flink-1.13.2:/opt/flink
networks:
dataease-network :
ipv4_address: 192.168.43.200

jobmanager:
user: flink:flink
image: flink:1.13.2-scala_2.11

ports:
- "8081:8081"
command: jobmanager
privileged: true
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
volumes:
# - shared-tmpfs:/tmp/iceberg
- D:/flink-1.13.2:/opt/flink
networks:
dataease-network :
ipv4_address: 192.168.43.201

taskmanager:
user: flink:flink
image: flink:1.13.2-scala_2.11

depends_on:
- jobmanager
command: taskmanager
privileged: true
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
volumes:
# - shared-tmpfs:/tmp/iceberg
- D:/flink-1.13.2:/opt/flink
networks:
dataease-network :
ipv4_address: 192.168.43.202

mysql:
image: debezium/example-mysql:1.1

ports:
- "3306:3306"
privileged: true
environment:
- MYSQL_ROOT_PASSWORD=123456
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
networks:
dataease-network :
ipv4_address: 192.168.43.205

networks:
dataease-network:
driver: bridge
ipam:
driver: default
config:
- subnet: 192.168.43.0/16
gateway: 192.168.43.1




2.然后操作下面语句在flink的sql-client里,可以实时看到数据同步



CREATE TABLE test_log
(
`timestamp` DATETIME NOT NULL COMMENT "日志时间",
`type` INT NOT NULL COMMENT "日志类型",
`error_code` INT COMMENT "错误码",
`op_id` BIGINT COMMENT "负责人id"
)
DUPLICATE KEY(`timestamp`, `type`)
DISTRIBUTED BY HASH(`timestamp`) BUCKETS 10
PROPERTIES
(
"replication_num" = "1" --指定一个副本(默认三个)
);


INSERT INTO test_log1 VALUES ("2022/12/10 10:10:10",2,3,333);

SELECT * FROM test_log1;

UPDATE test_order SET order_status = '待发货' WHERE order_id = 1;
UPDATE test_log1 SET op_id = 3331 WHERE op_id = 333
errCode = 2, detailMessage = Only unique olap table could be updated.


CREATE TABLE sales_order
(
orderid BIGINT,
status TINYINT,
username VARCHAR(32),
amount BIGINT DEFAULT '0'
)
UNIQUE KEY(orderid)
comment '明细去重模型'
DISTRIBUTED BY HASH(orderid) BUCKETS 10;

//doris创建表默认是三份副本,如果部署只有一个BE,需要只指定一份副本
CREATE TABLE sales_order
(
orderid BIGINT,
STATUS TINYINT,
username VARCHAR(32),
amount BIGINT DEFAULT '0'
)
UNIQUE KEY(orderid)
COMMENT '明细去重模型'
DISTRIBUTED BY HASH(orderid) BUCKETS 10
PROPERTIES
(
"replication_num" = "1" --指定一个副本(默认三个)
);
//只有表是unique 数据模型才可以更新update表的数据

INSERT INTO sales_order VALUES (111,2,'yand',999);
UPDATE sales_order SET amount = 991 WHERE orderid = 111

SHOW frontends

SHOW BACKENDS

ALTER SYSTEM ADD BACKEND "192.168.43.199:9050"

CREATE TABLE session_data
(
`visitorid` INT NOT NULL COMMENT "日志时间",
`sessionid` INT NOT NULL COMMENT "日志类型",
`city` VARCHAR(32) COMMENT "错误码",
`ip` VARCHAR(32) COMMENT "负责人id"
)
DUPLICATE KEY(`visitorid`, `sessionid`)
DISTRIBUTED BY HASH(`sessionid`,`visitorid`) BUCKETS 10
PROPERTIES
(
"replication_num" = "1" --指定一个副本(默认三个)
);


CREATE TABLE `doris_test` (
`id` int NULL COMMENT "id",
`name` varchar(40) NULL COMMENT "名称"
) ENGINE=OLAP
DUPLICATE KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_num" = "1",
"in_memory" = "false",
"storage_format" = "V2"
);

INSERT INTO doris_test (id,NAME) VALUES(2,'hdfs2');
INSERT INTO doris_test (id,NAME) VALUES(3,'hdfs5');


CREATE TABLE `mysql_pv` (
`siteid` int(11) NOT NULL DEFAULT '10',
`citycode` smallint(6) DEFAULT NULL,
`username` varchar(32) DEFAULT '',
`pv` bigint(20) DEFAULT '0',
PRIMARY KEY (`siteid`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1


CREATE TABLE doris_pv
(
siteid INT DEFAULT '10',
citycode SMALLINT,
username VARCHAR(32) DEFAULT '',
pv BIGINT SUM DEFAULT '0'
)
AGGREGATE KEY(siteid, citycode, username)
DISTRIBUTED BY HASH(siteid) BUCKETS 10
PROPERTIES("replication_num" = "1");


CREATE TABLE mysql_pv_source (
siteid INT,
citycode SMALLINT,
username STRING,
pv BIGINT,
PRIMARY KEY (siteid) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'mysql',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'demo',
'table-name' = 'mysql_pv'
);

CREATE TABLE mysql_pv_source (
siteid INT,
citycode SMALLINT,
username STRING,
pv BIGINT,
PRIMARY KEY (siteid) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.43.205',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'demo',
'table-name' = 'mysql_pv'
);



CREATE TABLE doris_pv_sink (
siteid INT,
citycode SMALLINT,
username STRING,
pv BIGINT
)
WITH (
'connector' = 'doris',
'fenodes' = '192.168.43.198:8030',
'table.identifier' = 'wudl_db.doris_pv',
'sink.batch.size' = '2',
'sink.batch.interval'='1',
'username' = 'root',
'password' = ''
);

INSERT INTO doris_pv_sink select siteid,citycode,username,pv from mysql_pv_source;

select * from mysql_pv_source;
select * from doris_pv_sink;

CREATE TABLE doris_pv_sink3 (
siteid INT,
citycode SMALLINT,
username STRING,
pv BIGINT
)
WITH (
'connector' = 'doris',
'fenodes' = '192.168.43.138:8030',
'table.identifier' = 'wudl_db.doris_pv',
'sink.batch.size' = '2',
'sink.batch.interval'='1',
'username' = 'root',
'password' = ''
);

INSERT INTO mysql_pv VALUES(1,1,'guandong',10);
INSERT INTO mysql_pv VALUES(1,1,'shenzheng',10);

INSERT INTO doris_pv_sink select siteid,citycode,username,pv from mysql_pv_source;
INSERT INTO doris_pv_sink3 select siteid,citycode,username,pv from mysql_pv_source;


docker cp D:/flink-1.13.2/lib/flink-doris-connector-1.13_2.12-1.0.3.jar sql-client-1:/opt/flink/lib/

This is not the full help, this menu is stripped into categories.
Use "--help category" to get an overview of all categories.
For all options use the manual or "--help all".
$ curl -i http://localhost:8030/api/backends?is_alive=truefailed
curl: (7) Failed to connect to localhost port 8030: Connection refused
$ curl -i http://localhost:8030
curl: (7) Failed to connect to localhost port 8030: Connection refused
$ curl -i http://localhost:8099
curl: (7) Failed to connect to localhost port 8099: Connection refused
$ curl -i http://192.168.43.198:8030
^C
$ ^C
//docker容器里面可以用curl指令来判断网络及端口通不通
//容器内访问不了doris服务节点,也就是容器内Ping不通 宿主机,需要通过bridge,桥接地址ip就可以直接访问容器ip的服务

docker inspect docker_name; 查看容器名ip及子网,方便检查各容器不通的原因

//root命令进入容器
docker exec -ti -u root bf6b91b20f5df94ada0b2d22ca023cf2dff2e20839ccc04e60d8bf9d9634f60b bash

DELIMITER $$

CREATE
/*[DEFINER = { user | CURRENT_USER }]*/
PROCEDURE `demo`.`test3`()
/*LANGUAGE SQL
| [NOT] DETERMINISTIC
| { CONTAINS SQL | NO SQL | READS SQL DATA | MODIFIES SQL DATA }
| SQL SECURITY { DEFINER | INVOKER }
| COMMENT 'string'*/
BEGIN

DECLARE i INT;
SET i = 20;
WHILE i < 6200 DO
#INSERT INTO mysql_pv(NAME,description) VALUES('abc', 'desc');
INSERT INTO mysql_pv VALUES(i,i,'guandong',i);
SET i = i + 1;
END WHILE;

END$$


CALL test3();

SELECT APPROX_COUNT_DISTINCT(siteid) FROM doris_pv;

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论