1.Greenplum+Kafka
从 kafka 同步数据到 greenplum 有两种方式:
- gpss 启动服务,用 gpsscli 向 gpss 注册 kafka 加载作业(重点介绍)
- 用 gpkafka 组件来快速完成上面的步骤,因为 gpkafka 封装了 gpss 和 gpsscli 的功能
greenplum对接一下kafka,参考官方资料:
https://gpdb.docs.pivotal.io/5180/greenplum-kafka/load-from-kafka-example.html
2.安装
2.1.安装kafka环境
2.2.安装 gpss
step 1.安装 gpss-gpdb6-1.5.3-rhel7-x86_64.gppkg
gppkg -i /home/gpkafka/gpss-gpdb6-1.5.3-rhel7-x86_64.gppkg
==========================================================================
GPSS installation is complete! To proceed, create gpss extension in the
target database with:
"CREATE EXTENSION gpss;"
==========================================================================
20220120:08:45:44:008599 gppkg:tcloud:gpadmin-[INFO]:-gpss-gpdb6-1.5.3-rhel7-x86_64.gppkg successfully installed.
step 2.安装扩展
[gpadmin@tcloud ~]$ psql
psql (9.4.24)
Type "help" for help.
# 切换数据库
gpdb=# \c datacenter
# 安装 gpss
datacenter=# CREATE EXTENSION gpss;
报错 1️⃣
ERROR: could not open extension control file
"/usr/local/greenplum-db-6.13.0/share/postgresql/extension/gpss.control":
No such file or directory
在报错文件夹/usr/local/greenplum-db-6.13.0/share/postgresql/extension/下添加两个文件 gpss.control 和 gpss–1.0.sql。
# 添加 gpss.control 和 gpss--1.0.sql 后重新安装
datacenter=# CREATE EXTENSION gpss;
报错 2️⃣
ERROR: could not access file "$libdir/gpfmt_gpss.so": No such file or directory
在文件夹/usr/local/greenplum-db-6.13.0/lib/postgresql下添加三个文件 gpfmt_gpss.so 、gpfmt_protobuf.so 和 gpss.so 这三个文件在 gpss–1.0.sql 内用到了。
# 添加三个文件 gpfmt_gpss.so、gpfmt_protobuf.so 和 gpss.so 后重新安装
datacenter=# CREATE EXTENSION gpss;
CREATE EXTENSION
3.基于Greenplum+Kafka的实时数据处理
step 1.启动kafka
# 启动zookeeper
bin/zkServer.sh start
# 启动kafka
bin/kafka-server-start.sh -daemon ../config/server.properties
step 2.创建gpss扩展
在将Kafka消息数据加载到Greenplum数据库之前,必须在将Kafka数据写入Greenplum表的每个数据库中注册Greenplum-Kafka集成格式化程序函数
[gpadmin@oracle166 ~]$ psql
psql (8.3.23)
Type "help" for help.
test=# CREATE EXTENSION gpss;
step 3.创建示例表
kafka的数据格式json形式;样式:
{"time":1550198435941,"type":"type_mobileinfo","phone_imei":"861738033581011","phone_imsi":"","phone_mac":"00:27:1c:95:47:09","appkey":"307A5C626E6C2F6472636E6E6A2F736460656473","phone_udid":"8F137BFFB2289784A5EA2DCADCE519C2","phone_udid2":"744DD04CE29652F4F1D2DFFC8D3204A9","appUdid":"D21C76419E54B18DDBB94BF2E6990183","phone_resolution":"1280*720","phone_apn":"","phone_model":"BF T26","phone_firmware_version":"5.1","phone_softversion":"3.19.0","phone_softname":"com.esbook.reader","sdk_version":"3.1.8","cpid":"blp1375_13621_001","currentnetworktype":"wifi","phone_city":"","os":"android","install_path":"\/data\/app\/com.esbook.reader-1\/base.apk","last_cpid":"","package_name":"com.esbook.reader","src_code":"WIFIMAC:00:27:1c:95:47:09"}
使用其中的 package_name,appkey ,time, phone_udid,os, idfa,phone_imei,cpid,last_cpid,phone_number 字段
CREATE TABLE tbl_novel_mobile_log (
package_name text,
appkey text,
ts bigint,
phone_udid text,
os character varying(20),
idfa character varying(64),
phone_imei character varying(20),
cpid text,
last_cpid text,
phone_number character varying(20)
) ;
step 4.创建gpkafka.yaml配置文件
gpkafka_mobile_yaml文件内容:
DATABASE: lottu
USER: gpadmin
HOST: oracle166
PORT: 5432
KAFKA:
INPUT:
SOURCE:
BROKERS: kafkaip:9092
TOPIC: mobile_info
COLUMNS:
- NAME: jdata
TYPE: json
FORMAT: json
ERROR_LIMIT: 10
OUTPUT:
TABLE: tbl_novel_mobile_log
MAPPING:
- NAME: package_name
EXPRESSION: (jdata->>'package_name')::text
- NAME: appkey
EXPRESSION: (jdata->>'appkey')::text
- NAME: ts
EXPRESSION: (jdata->>'time')::bigint
- NAME: phone_udid
EXPRESSION: (jdata->>'phone_udid')::text
- NAME: os
EXPRESSION: (jdata->>'os')::text
- NAME: idfa
EXPRESSION: (jdata->>'idfa')::text
- NAME: phone_imei
EXPRESSION: (jdata->>'phone_imei')::text
- NAME: cpid
EXPRESSION: (jdata->>'cpid')::text
- NAME: last_cpid
EXPRESSION: (jdata->>'last_cpid')::text
- NAME: phone_number
EXPRESSION: (jdata->>'phone_number')::text
COMMIT:
MAX_ROW: 1000
step 5.创建 topic
bin/kafka-topics.sh --create --zookeeper kafkaIp:2181 --replication-factor 1 --partitions 1 --topic mobile_info
step 6.创建kafka的发布者,并添加kafka记录
bin/kafka-console-producer.sh --broker-list kafkaIP:9092 --topic mobile_info
>{"time":1550198435941,"type":"type_mobileinfo","phone_imei":"861738033581011","phone_imsi":"","phone_mac":"00:27:1c:95:47:09","appkey":"307A5C626E6C2F6472636E6E6A2F736460656473","phone_udid":"8F137BFFB2289784A5EA2DCADCE519C2","phone_udid2":"744DD04CE29652F4F1D2DFFC8D3204A9","appUdid":"D21C76419E54B18DDBB94BF2E6990183","phone_resolution":"1280*720","phone_apn":"","phone_model":"BF T26","phone_firmware_version":"5.1","phone_softversion":"3.19.0","phone_softname":"com.esbook.reader","sdk_version":"3.1.8","cpid":"blp1375_13621_001","currentnetworktype":"wifi","phone_city":"","os":"android","install_path":"\/data\/app\/com.esbook.reader-1\/base.apk","last_cpid":"","package_name":"com.esbook.reader","src_code":"WIFIMAC:00:27:1c:95:47:09"}
{"time":1550198437885,"type":"type_mobileinfo","phone_imei":"862245038046551","phone_imsi":"","phone_mac":"02:00:00:00:00:00","appkey":"307A5C626F2F76646B74606F2F736460656473","phone_udid":"A3BB70A0218AEFC7908B1D79C0C02D77","phone_udid2":"E3976E0453010FC7F32B6143AA3A164E","appUdid":"4FBEF77BC076254ED0407CAD653E6954","phone_resolution":"1920*1080","phone_apn":"","phone_model":"Le X620","phone_firmware_version":"6.0","phone_softversion":"1.9.0","phone_softname":"cn.wejuan.reader","sdk_version":"3.1.8","cpid":"blf1298_14411_001","currentnetworktype":"wifi","phone_city":"","os":"android","install_path":"\/data\/app\/cn.wejuan.reader-1\/base.apk","last_cpid":"","package_name":"cn.wejuan.reader","src_code":"ffffffff-9063-8e34-0000-00007efffeff"}
{"time":1550198438311,"type":"type_mobileinfo","phone_number":"","phone_imei":"867520045576831","phone_imsi":"460001122544742","phone_mac":"02:00:00:00:00:00","appkey":"307A5C626E6C2F6472636E6E6A2F736460656473","phone_udid":"A00407EF9D6EBCC207A514CDA452EB76","phone_udid2":"A00407EF9D6EBCC207A514CDA452EB76","appUdid":"1C35633F4EB8218789EFD8666C763485","phone_resolution":"2086*1080","phone_apn":"CMCC","phone_model":"ONEPLUS A6000","phone_firmware_version":"9","phone_softversion":"3.19.0","phone_softname":"com.esbook.reader","sdk_version":"3.1.8","cpid":"blf1298_12242_001","currentnetworktype":"4gnet","phone_city":"","os":"android","install_path":"\/data\/app\/com.esbook.reader-TlgFCk6ANgEDRnXDCem8uQ==\/base.apk","last_cpid":"","package_name":"com.esbook.reader","src_code":"460001122544742"}
{"time":1550198433102,"type":"type_mobileinfo","phone_number":"15077113477","phone_imei":"860364049874919","phone_imsi":"460023771256711","phone_mac":"02:00:00:00:00:00","appkey":"307A5C626E6C2F6472636E6E6A2F736460656473","phone_udid":"EEF566CB5253AA62B653347A203815C3","phone_udid2":"0845931539AE39B3B0D4EB42B85D98EC","appUdid":"9570DCA2D574E6B69B24137035209D42","phone_resolution":"2340*1080","phone_apn":"CHINA MOBILE","phone_model":"PBEM00","phone_firmware_version":"8.1.0","phone_softversion":"3.19.0","phone_softname":"com.esbook.reader","sdk_version":"3.1.8","cpid":"blf1298_12242_001","currentnetworktype":"4gnet","phone_city":"","os":"android","install_path":"\/data\/app\/com.esbook.reader-NBToXQo14TOeNuPxo_aA4w==\/base.apk","last_cpid":"","package_name":"com.esbook.reader","src_code":"13598c2d-efc4-4957-8d4d-22eb145d15fd"}
{"time":1550198440577,"type":"type_mobileinfo","phone_imei":"869800021106037","phone_imsi":"","phone_mac":"2c:5b:b8:fb:79:af","appkey":"307A5C626E6C2F6472636E6E6A2F736460656473","phone_udid":"2BC16C4AC07070BA9608BBD0EE2EE320","phone_udid2":"A7F9FA4772D31FADEECFDB445BA3BEBB","appUdid":"DC6BEE2F6E5D6A133E26131887AE788A","phone_resolution":"960*540","phone_apn":"","phone_model":"OPPO A33","phone_firmware_version":"5.1.1","phone_softversion":"3.19.0","phone_softname":"com.esbook.reader","sdk_version":"3.1.8","cpid":"blp1375_14526_003","currentnetworktype":"wifi","phone_city":"","os":"android","install_path":"\/data\/app\/com.esbook.reader-1\/base.apk","last_cpid":"","package_name":"com.esbook.reader","src_code":"WIFIMAC:2c:5b:b8:fb:79:af"}
{"time":1506944701166,"type":"type_mobileinfo","phone_number":"+8618602699126","phone_imei":"865902038154143","phone_imsi":"460012690618403","phone_mac":"02:00:00:00:00:00","appkey":"307A5C626E6C2F6472636E6E6A2F736460656473","phone_udid":"388015DA70C0AEA6D59D3CE37B0C4BA2","phone_udid2":"388015DA70C0AEA6D59D3CE37B0C4BA2","appUdid":"EC0A105297D55075526018078A4A1B84","phone_resolution":"1920*1080","phone_apn":"中国联通","phone_model":"MI MAX 2","phone_firmware_version":"7.1.1","phone_softversion":"3.19.0","phone_softname":"com.esbook.reader","sdk_version":"3.1.8","cpid":"blf1298_10928_001","currentnetworktype":"wifi","phone_city":"","os":"android","install_path":"\/data\/app\/com.esbook.reader-1\/base.apk","last_cpid":"","package_name":"com.esbook.reader","src_code":"460012690618403"}
step 7.执行gpkafka加载数据
[gpadmin@oracle166 ~]$ gpkafka load --quit-at-eof ./gpkafka_mobile_yaml
PartitionID StartTime EndTime BeginOffset EndOffset
0 2019-02-27T09:26:27.989312Z 2019-02-27T09:26:27.99517Z 0 5
Job dcd0d159282c0ef39f182cabeef23ee6 stopped normally at 2019-02-27 09:26:29.442874281 +0000 UTC
step 8.检查加载操作的进度(非必要)
[gpadmin@oracle166 ~]$ gpkafka check ./gpkafka_mobile_yaml
PartitionID StartTime EndTime BeginOffset EndOffset
0 2019-02-27T09:26:27.989312Z 2019-02-27T09:26:27.99517Z 0 5
step 9.查看表数据
[gpadmin@oracle166 ~]$ psql
psql (8.3.23)
Type "help" for help.
lottu=# select * from tbl_novel_mobile_log ;
package_name | appkey | ts | phone_udid | os | idfa | phone_imei | cpid | last_cpid | p
hone_number
-------------------+------------------------------------------+---------------+----------------------------------+---------+------+-----------------+-------------------+-----------+--
------------
com.esbook.reader | 307A5C626E6C2F6472636E6E6A2F736460656473 | 1550198435941 | 8F137BFFB2289784A5EA2DCADCE519C2 | android | | 861738033581011 | blp1375_13621_001 | |
com.esbook.reader | 307A5C626E6C2F6472636E6E6A2F736460656473 | 1550198438311 | A00407EF9D6EBCC207A514CDA452EB76 | android | | 867520045576831 | blf1298_12242_001 | |
com.esbook.reader | 307A5C626E6C2F6472636E6E6A2F736460656473 | 1550198433102 | EEF566CB5253AA62B653347A203815C3 | android | | 860364049874919 | blf1298_12242_001 | | 1
5077113477
com.esbook.reader | 307A5C626E6C2F6472636E6E6A2F736460656473 | 1550198440577 | 2BC16C4AC07070BA9608BBD0EE2EE320 | android | | 869800021106037 | blp1375_14526_003 | |
cn.wejuan.reader | 307A5C626F2F76646B74606F2F736460656473 | 1550198437885 | A3BB70A0218AEFC7908B1D79C0C02D77 | android | | 862245038046551 | blf1298_14411_001 | |
(5 rows)
4.用 gpss 从 kafka 消费数据加载到 greenplum
4.1.创建配置
step 1.准备 kafka 生产者和消费者
# kafka 生产者程序:
kafka-console-producer.sh --broker-list 192.168.12.115:7776 --topic gpss_test
# kafka 消费者程序:
kafka-console-consumer.sh --bootstrap-server 192.168.12.115:7776 --topic gpss_test --from-beginning
step 2.配置 gpss 服务的 host 和 port
gpss4ic.json
{
"ListenAddress": {
"Host": "",
"Port": 50007
},
"Gpfdist": {
"Host": "",
"Port": 8319,
"ReuseTables": false
}
}
step 3.用于加载 kafka 数据到 greenplum 的配置文件
- 加载以”|” 分割的流数据的配置文件 kafka_testdata_delimited.yaml
DATABASE: yloms USER: gpss_usr PASSWORD: gpss_usr HOST: mdw PORT: 5432 VERSION: 2 KAFKA: INPUT: SOURCE: BROKERS: 192.168.12.115:7776 TOPIC: gpss_test VALUE: COLUMNS: - NAME: tid TYPE: integer - NAME: tcode TYPE: varchar - NAME: tname TYPE: varchar FORMAT: delimited DELIMITED_OPTION: DELIMITER: "|" ERROR_LIMIT: 25 OUTPUT: SCHEMA: ylorder TABLE: test_heap METADATA: SCHEMA: ylorder COMMIT: MINIMAL_INTERVAL: 2000 POLL: BATCHSIZE: 100 TIMEOUT: 3000 - 加载 JSON 格式流数据的配置文件 kafka_testdata_json.yaml
DATABASE: yloms USER: gpss_usr PASSWORD: gpss_usr HOST: mdw PORT: 5432 VERSION: 2 KAFKA: INPUT: SOURCE: BROKERS: 192.168.12.115:7776 TOPIC: gpss_test VALUE: COLUMNS: - NAME: jdata TYPE: json FORMAT: json ERROR_LIMIT: 25 OUTPUT: SCHEMA: ylorder TABLE: test_heap MAPPING: - NAME: tid EXPRESSION: (jdata->>'tid')::int - NAME: tcode EXPRESSION: (jdata->>'tcode')::varchar - NAME: tname EXPRESSION: (jdata->>'tname')::varchar METADATA: SCHEMA: ylorder COMMIT: MINIMAL_INTERVAL: 2000 POLL: BATCHSIZE: 100 TIMEOUT: 3000
4.2.启动作业
step 1.用 gpss 做 etl 加载:
启动 gpss 服务:
gpss gpss4ic.json
日志输出
20200225:22:08:21 gpss:gpadmin:greenplum-001:010656-[INFO]:-using config file: gpss4ic.json
20200225:22:08:21 gpss:gpadmin:greenplum-001:010656-[INFO]:-config file content: {
"ListenAddress": {
"Host": "mdw",
"Port": 50007,
"Certificate": {
"CertFile": "",
"KeyFile": "",
"CAFile": ""
}
},
"Gpfdist": {
"Host": "mdw",
"Port": 8319,
"ReuseTables": false,
"Certificate": {
"CertFile": "",
"KeyFile": "",
"CAFile": ""
},
"BindAddress": "0.0.0.0"
}
}
20200225:22:08:21 gpss:gpadmin:greenplum-001:010656-[INFO]:-gpss-listen-address-prefix: mdw:50007
20200225:22:08:21 gpss:gpadmin:greenplum-001:010656-[INFO]:-gpss will use random external table name, external table won't get reused
20200225:22:08:21 gpss:gpadmin:greenplum-001:010656-[INFO]:-gpfdist listening on 0.0.0.0:8319
20200225:22:08:21 gpss:gpadmin:greenplum-001:010656-[INFO]:-gpss listening on mdw:50007
step 2.提交一个作业:
gpsscli submit --name kafkajson2gp --gpss-port 50007 --gpss-host mdw ./kafka_testdata_json.yaml
输出如下
20200225:22:09:16 gpsscli:gpadmin:greenplum-001:010722-[INFO]:-JobID: kafkajson2gp
step 3.查看作业列表:
gpsscli list --all --gpss-port 50007 --gpss-host mdw
输出如下
JobID GPHost GPPort DataBase Schema Table Topic Status
kafkajson2gp mdw 5432 yloms ylorder test_heap gpss_test JOB_STOPPED
step 4.启动作业:
gpsscli start kafkajson2gp --gpss-port 50007 --gpss-host mdw
输出如下
20200225:22:10:24 gpsscli:gpadmin:greenplum-001:010756-[INFO]:-JobID: kafkajson2gp is started
step 5.查看作业:
gpsscli list --all --gpss-port 50007 --gpss-host mdw
输出如下
JobID GPHost GPPort DataBase Schema Table Topic Status
kafkajson2gp mdw 5432 yloms ylorder test_heap gpss_test JOB_RUNNING
step 6.停掉作业:
gpsscli stop kafkajson2gp --gpss-port 50007 --gpss-host mdw
输出如下
20200225:22:11:04 gpsscli:gpadmin:greenplum-001:010801-[INFO]:-Stop a job: kafkajson2gp, status JOB_STOPPED
4.3.用 gpkafka 启动服务:
gpkafka load 可以理解为代替了 gpsscli 上的提交作业,启动作业等命令。
gpkafka --config gpss4ic.json load kafka_testdata_json.yaml
查看数据库保存的偏移量
select * from kafka_test.gpkafka_data_from_kafka_12ead185469b45cc8e5be3c9f0ea14a2 limit 10;
5.实时数据同步
5.1.maxwell + Kafka + bireme
采用 maxwell + Kafka + bireme,将MySQL数据实时同步至Greenplum。
maxwell实时解析MySQL的binlog,并将输出的JSON格式数据发送到Kafka,Kafka在此方案中主要用于消息中转,bireme负责读取Kafka的消息,并应用于Greenplum数据库以增量同步数据。

maxwell是一个能实时读取MySQL二进制日志binlog,并生成JSON 格式的消息,作为生产者发送给Kafka、Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其他平台的应用程序,其中Kafka是maxwell支持最完善的一个消息系统。它的常见应用场景有ETL、维护缓存、收集表级别的dml指标、增量到搜索引擎、数据分区迁移、切库binlog回滚方案等。
maswell在GitHub上具有较高的活跃度,官网地址为地址为https://github.com/zendesk/maxwell。
maxwell主要提供了下列功能:
- 支持 SELECT * FROM table 方式进行全量数据初始化。
- 支持GTID,当MySQL发生failover后,自动恢复binlog位置。
- 可以对数据进行分区,解决数据倾斜问题,发送到Kafka的数据支持database、table、column等级别的数据分区。
- 工作方式是伪装为MySQL Slave,在主库上创建dump线程连接,接收binlog事件,然后根据schemas信息拼装成JSON字符串,可以接受ddl、xid、row等各种事件。
bireme是一个Greenplum数据仓库的增量同步工具,目前支持MySQL、PostgreSQL和MongoDB数据源,maxwell + Kafka 是一种支持的数据源类型。
bireme作为Kafka的消费者,采用 DELETE + COPY 的方式,将数据源的修改记录同步到Greenplum,相较于INSERT、UPDATE、DELETE方式,COPY方式速度更快,性能更优。bireme的主要特性是采用小批量加载方式(默认加载延迟时间为10秒钟)提升数据同步的性能,但要求所有同步表在源和目标数据库中都必须有主键。bireme 官网地址为https://github.com/HashDataInc/bireme/。
Kafka在本架构中作为消息中间件将maxwell和bireme桥接在一起,上下游组件的实现都依赖于它。
5.2.Canal + Kafka + ClientAdapter

Canal是阿里开源的一个的组件,无论功能还是实现上都与maxwell类似。其主要用途是基于MySQL数据库增量日志解析,提供增量数据订阅和消费,工作原理相对比较简单:
- Canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议。
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 Canal )。
- Canal 解析 binary log 对象(原始字节流)。
Canal Server代表一个Canal运行实例,对应于一个jvm。Instance对应于一个数据队列,1个Server对应1..n个Instance。Instance模块中,EventParser完成数据源接入,模拟slave与master进行交互并解析协议。EventSink是Parser和Store的连接器,进行数据过滤、加工与分发。EventStore负责存储数据。MetaManager是增量订阅与消费信息管理器。

Canal 1.1.1版本之后默认支持将Canal Server接收到的binlog数据直接投递到消息队列,目前默认支持的消息系统有Kafka和RocketMQ。早期的Canal仅提供Client API,需要用户自己编写客户端程序实现消费逻辑。Canal 1.1.1版本之后增加了client-adapter,提供客户端数据落地的适配及启动功能。




