暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Greenplum+Kafka 实时数据处理

原创 sg1234 2023-07-16
1040

1.Greenplum+Kafka

从 kafka 同步数据到 greenplum 有两种方式:

  • gpss 启动服务,用 gpsscli 向 gpss 注册 kafka 加载作业(重点介绍)
  • 用 gpkafka 组件来快速完成上面的步骤,因为 gpkafka 封装了 gpss 和 gpsscli 的功能

greenplum对接一下kafka,参考官方资料:
https://gpdb.docs.pivotal.io/5180/greenplum-kafka/load-from-kafka-example.html

2.安装

2.1.安装kafka环境

2.2.安装 gpss

step 1.安装 gpss-gpdb6-1.5.3-rhel7-x86_64.gppkg

gppkg -i /home/gpkafka/gpss-gpdb6-1.5.3-rhel7-x86_64.gppkg

==========================================================================
GPSS installation is complete! To proceed, create gpss extension in the
target database with:
    "CREATE EXTENSION gpss;"
==========================================================================
20220120:08:45:44:008599 gppkg:tcloud:gpadmin-[INFO]:-gpss-gpdb6-1.5.3-rhel7-x86_64.gppkg successfully installed.

step 2.安装扩展

[gpadmin@tcloud ~]$ psql
psql (9.4.24)
Type "help" for help.
# 切换数据库
gpdb=# \c datacenter
# 安装 gpss
datacenter=# CREATE EXTENSION gpss;

报错 1️⃣

ERROR:  could not open extension control file 
"/usr/local/greenplum-db-6.13.0/share/postgresql/extension/gpss.control":
No such file or directory

在报错文件夹/usr/local/greenplum-db-6.13.0/share/postgresql/extension/下添加两个文件 gpss.control 和 gpss–1.0.sql。

# 添加 gpss.control 和 gpss--1.0.sql 后重新安装
datacenter=# CREATE EXTENSION gpss;

报错 2️⃣

ERROR:  could not access file "$libdir/gpfmt_gpss.so": No such file or directory

在文件夹/usr/local/greenplum-db-6.13.0/lib/postgresql下添加三个文件 gpfmt_gpss.so 、gpfmt_protobuf.so 和 gpss.so 这三个文件在 gpss–1.0.sql 内用到了。

# 添加三个文件 gpfmt_gpss.so、gpfmt_protobuf.so 和 gpss.so 后重新安装
datacenter=# CREATE EXTENSION gpss;
CREATE EXTENSION

3.基于Greenplum+Kafka的实时数据处理

step 1.启动kafka

# 启动zookeeper
bin/zkServer.sh start

# 启动kafka
bin/kafka-server-start.sh -daemon ../config/server.properties

step 2.创建gpss扩展
在将Kafka消息数据加载到Greenplum数据库之前,必须在将Kafka数据写入Greenplum表的每个数据库中注册Greenplum-Kafka集成格式化程序函数

[gpadmin@oracle166 ~]$ psql
psql (8.3.23)
Type "help" for help.

test=# CREATE EXTENSION gpss;

step 3.创建示例表
kafka的数据格式json形式;样式:

{"time":1550198435941,"type":"type_mobileinfo","phone_imei":"861738033581011","phone_imsi":"","phone_mac":"00:27:1c:95:47:09","appkey":"307A5C626E6C2F6472636E6E6A2F736460656473","phone_udid":"8F137BFFB2289784A5EA2DCADCE519C2","phone_udid2":"744DD04CE29652F4F1D2DFFC8D3204A9","appUdid":"D21C76419E54B18DDBB94BF2E6990183","phone_resolution":"1280*720","phone_apn":"","phone_model":"BF T26","phone_firmware_version":"5.1","phone_softversion":"3.19.0","phone_softname":"com.esbook.reader","sdk_version":"3.1.8","cpid":"blp1375_13621_001","currentnetworktype":"wifi","phone_city":"","os":"android","install_path":"\/data\/app\/com.esbook.reader-1\/base.apk","last_cpid":"","package_name":"com.esbook.reader","src_code":"WIFIMAC:00:27:1c:95:47:09"}

使用其中的 package_name,appkey ,time, phone_udid,os, idfa,phone_imei,cpid,last_cpid,phone_number 字段

CREATE TABLE tbl_novel_mobile_log (
    package_name text,
    appkey text,
    ts bigint,
    phone_udid text,
    os character varying(20),
    idfa character varying(64),
    phone_imei character varying(20),
    cpid text,
    last_cpid text,
    phone_number character varying(20)
) ;

step 4.创建gpkafka.yaml配置文件

gpkafka_mobile_yaml文件内容:
DATABASE: lottu
USER: gpadmin
HOST: oracle166
PORT: 5432
KAFKA:
   INPUT:
     SOURCE:
        BROKERS: kafkaip:9092
        TOPIC: mobile_info
     COLUMNS:
        - NAME: jdata
          TYPE: json
     FORMAT: json
     ERROR_LIMIT: 10
   OUTPUT:
     TABLE: tbl_novel_mobile_log
     MAPPING:
        - NAME: package_name
          EXPRESSION: (jdata->>'package_name')::text
        - NAME: appkey
          EXPRESSION: (jdata->>'appkey')::text
        - NAME: ts
          EXPRESSION: (jdata->>'time')::bigint
        - NAME: phone_udid
          EXPRESSION: (jdata->>'phone_udid')::text
        - NAME: os
          EXPRESSION: (jdata->>'os')::text
        - NAME: idfa
          EXPRESSION: (jdata->>'idfa')::text
        - NAME: phone_imei
          EXPRESSION: (jdata->>'phone_imei')::text
        - NAME: cpid
          EXPRESSION: (jdata->>'cpid')::text
        - NAME: last_cpid
          EXPRESSION: (jdata->>'last_cpid')::text
        - NAME: phone_number
          EXPRESSION: (jdata->>'phone_number')::text
   COMMIT:
     MAX_ROW: 1000

step 5.创建 topic

bin/kafka-topics.sh --create --zookeeper kafkaIp:2181 --replication-factor 1 --partitions 1  --topic mobile_info

step 6.创建kafka的发布者,并添加kafka记录

bin/kafka-console-producer.sh  --broker-list kafkaIP:9092 --topic mobile_info

>{"time":1550198435941,"type":"type_mobileinfo","phone_imei":"861738033581011","phone_imsi":"","phone_mac":"00:27:1c:95:47:09","appkey":"307A5C626E6C2F6472636E6E6A2F736460656473","phone_udid":"8F137BFFB2289784A5EA2DCADCE519C2","phone_udid2":"744DD04CE29652F4F1D2DFFC8D3204A9","appUdid":"D21C76419E54B18DDBB94BF2E6990183","phone_resolution":"1280*720","phone_apn":"","phone_model":"BF T26","phone_firmware_version":"5.1","phone_softversion":"3.19.0","phone_softname":"com.esbook.reader","sdk_version":"3.1.8","cpid":"blp1375_13621_001","currentnetworktype":"wifi","phone_city":"","os":"android","install_path":"\/data\/app\/com.esbook.reader-1\/base.apk","last_cpid":"","package_name":"com.esbook.reader","src_code":"WIFIMAC:00:27:1c:95:47:09"}
{"time":1550198437885,"type":"type_mobileinfo","phone_imei":"862245038046551","phone_imsi":"","phone_mac":"02:00:00:00:00:00","appkey":"307A5C626F2F76646B74606F2F736460656473","phone_udid":"A3BB70A0218AEFC7908B1D79C0C02D77","phone_udid2":"E3976E0453010FC7F32B6143AA3A164E","appUdid":"4FBEF77BC076254ED0407CAD653E6954","phone_resolution":"1920*1080","phone_apn":"","phone_model":"Le X620","phone_firmware_version":"6.0","phone_softversion":"1.9.0","phone_softname":"cn.wejuan.reader","sdk_version":"3.1.8","cpid":"blf1298_14411_001","currentnetworktype":"wifi","phone_city":"","os":"android","install_path":"\/data\/app\/cn.wejuan.reader-1\/base.apk","last_cpid":"","package_name":"cn.wejuan.reader","src_code":"ffffffff-9063-8e34-0000-00007efffeff"}
{"time":1550198438311,"type":"type_mobileinfo","phone_number":"","phone_imei":"867520045576831","phone_imsi":"460001122544742","phone_mac":"02:00:00:00:00:00","appkey":"307A5C626E6C2F6472636E6E6A2F736460656473","phone_udid":"A00407EF9D6EBCC207A514CDA452EB76","phone_udid2":"A00407EF9D6EBCC207A514CDA452EB76","appUdid":"1C35633F4EB8218789EFD8666C763485","phone_resolution":"2086*1080","phone_apn":"CMCC","phone_model":"ONEPLUS A6000","phone_firmware_version":"9","phone_softversion":"3.19.0","phone_softname":"com.esbook.reader","sdk_version":"3.1.8","cpid":"blf1298_12242_001","currentnetworktype":"4gnet","phone_city":"","os":"android","install_path":"\/data\/app\/com.esbook.reader-TlgFCk6ANgEDRnXDCem8uQ==\/base.apk","last_cpid":"","package_name":"com.esbook.reader","src_code":"460001122544742"}
{"time":1550198433102,"type":"type_mobileinfo","phone_number":"15077113477","phone_imei":"860364049874919","phone_imsi":"460023771256711","phone_mac":"02:00:00:00:00:00","appkey":"307A5C626E6C2F6472636E6E6A2F736460656473","phone_udid":"EEF566CB5253AA62B653347A203815C3","phone_udid2":"0845931539AE39B3B0D4EB42B85D98EC","appUdid":"9570DCA2D574E6B69B24137035209D42","phone_resolution":"2340*1080","phone_apn":"CHINA MOBILE","phone_model":"PBEM00","phone_firmware_version":"8.1.0","phone_softversion":"3.19.0","phone_softname":"com.esbook.reader","sdk_version":"3.1.8","cpid":"blf1298_12242_001","currentnetworktype":"4gnet","phone_city":"","os":"android","install_path":"\/data\/app\/com.esbook.reader-NBToXQo14TOeNuPxo_aA4w==\/base.apk","last_cpid":"","package_name":"com.esbook.reader","src_code":"13598c2d-efc4-4957-8d4d-22eb145d15fd"}
{"time":1550198440577,"type":"type_mobileinfo","phone_imei":"869800021106037","phone_imsi":"","phone_mac":"2c:5b:b8:fb:79:af","appkey":"307A5C626E6C2F6472636E6E6A2F736460656473","phone_udid":"2BC16C4AC07070BA9608BBD0EE2EE320","phone_udid2":"A7F9FA4772D31FADEECFDB445BA3BEBB","appUdid":"DC6BEE2F6E5D6A133E26131887AE788A","phone_resolution":"960*540","phone_apn":"","phone_model":"OPPO A33","phone_firmware_version":"5.1.1","phone_softversion":"3.19.0","phone_softname":"com.esbook.reader","sdk_version":"3.1.8","cpid":"blp1375_14526_003","currentnetworktype":"wifi","phone_city":"","os":"android","install_path":"\/data\/app\/com.esbook.reader-1\/base.apk","last_cpid":"","package_name":"com.esbook.reader","src_code":"WIFIMAC:2c:5b:b8:fb:79:af"}
{"time":1506944701166,"type":"type_mobileinfo","phone_number":"+8618602699126","phone_imei":"865902038154143","phone_imsi":"460012690618403","phone_mac":"02:00:00:00:00:00","appkey":"307A5C626E6C2F6472636E6E6A2F736460656473","phone_udid":"388015DA70C0AEA6D59D3CE37B0C4BA2","phone_udid2":"388015DA70C0AEA6D59D3CE37B0C4BA2","appUdid":"EC0A105297D55075526018078A4A1B84","phone_resolution":"1920*1080","phone_apn":"中国联通","phone_model":"MI MAX 2","phone_firmware_version":"7.1.1","phone_softversion":"3.19.0","phone_softname":"com.esbook.reader","sdk_version":"3.1.8","cpid":"blf1298_10928_001","currentnetworktype":"wifi","phone_city":"","os":"android","install_path":"\/data\/app\/com.esbook.reader-1\/base.apk","last_cpid":"","package_name":"com.esbook.reader","src_code":"460012690618403"}

step 7.执行gpkafka加载数据

[gpadmin@oracle166 ~]$ gpkafka load --quit-at-eof ./gpkafka_mobile_yaml
PartitionID StartTime   EndTime BeginOffset EndOffset
0   2019-02-27T09:26:27.989312Z 2019-02-27T09:26:27.99517Z  0   5
Job dcd0d159282c0ef39f182cabeef23ee6 stopped normally at 2019-02-27 09:26:29.442874281 +0000 UTC 

step 8.检查加载操作的进度(非必要)

[gpadmin@oracle166 ~]$ gpkafka check ./gpkafka_mobile_yaml
PartitionID StartTime   EndTime BeginOffset EndOffset
0   2019-02-27T09:26:27.989312Z 2019-02-27T09:26:27.99517Z  0   5

step 9.查看表数据

[gpadmin@oracle166 ~]$ psql
psql (8.3.23)
Type "help" for help.

lottu=# select * from tbl_novel_mobile_log ;
   package_name    |                  appkey                  |      ts       |            phone_udid            |   os    | idfa |   phone_imei    |       cpid        | last_cpid | p
hone_number 
-------------------+------------------------------------------+---------------+----------------------------------+---------+------+-----------------+-------------------+-----------+--
------------
 com.esbook.reader | 307A5C626E6C2F6472636E6E6A2F736460656473 | 1550198435941 | 8F137BFFB2289784A5EA2DCADCE519C2 | android |      | 861738033581011 | blp1375_13621_001 |           | 
 com.esbook.reader | 307A5C626E6C2F6472636E6E6A2F736460656473 | 1550198438311 | A00407EF9D6EBCC207A514CDA452EB76 | android |      | 867520045576831 | blf1298_12242_001 |           | 
 com.esbook.reader | 307A5C626E6C2F6472636E6E6A2F736460656473 | 1550198433102 | EEF566CB5253AA62B653347A203815C3 | android |      | 860364049874919 | blf1298_12242_001 |           | 1
5077113477
 com.esbook.reader | 307A5C626E6C2F6472636E6E6A2F736460656473 | 1550198440577 | 2BC16C4AC07070BA9608BBD0EE2EE320 | android |      | 869800021106037 | blp1375_14526_003 |           | 
 cn.wejuan.reader  | 307A5C626F2F76646B74606F2F736460656473   | 1550198437885 | A3BB70A0218AEFC7908B1D79C0C02D77 | android |      | 862245038046551 | blf1298_14411_001 |           | 
(5 rows)

4.用 gpss 从 kafka 消费数据加载到 greenplum

4.1.创建配置

step 1.准备 kafka 生产者和消费者

# kafka 生产者程序:
kafka-console-producer.sh --broker-list 192.168.12.115:7776 --topic gpss_test

# kafka 消费者程序:
kafka-console-consumer.sh --bootstrap-server 192.168.12.115:7776 --topic gpss_test --from-beginning

step 2.配置 gpss 服务的 host 和 port

gpss4ic.json

{
    "ListenAddress": {
        "Host": "",
        "Port": 50007
    },
    "Gpfdist": {
        "Host": "",
        "Port": 8319,
        "ReuseTables": false
    }
}

step 3.用于加载 kafka 数据到 greenplum 的配置文件

  1. 加载以”|” 分割的流数据的配置文件 kafka_testdata_delimited.yaml
    DATABASE: yloms
    USER: gpss_usr
    PASSWORD: gpss_usr
    HOST: mdw
    PORT: 5432
    VERSION: 2
    KAFKA:
    INPUT:
    SOURCE:
       BROKERS: 192.168.12.115:7776
       TOPIC: gpss_test
    VALUE:
       COLUMNS:
         - NAME: tid
           TYPE: integer
         - NAME: tcode
           TYPE: varchar
         - NAME: tname
           TYPE: varchar
       FORMAT: delimited
       DELIMITED_OPTION:
         DELIMITER: "|"
    ERROR_LIMIT: 25
    OUTPUT:
    SCHEMA: ylorder
    TABLE: test_heap
    METADATA:
    SCHEMA: ylorder
    COMMIT:
    MINIMAL_INTERVAL: 2000
    POLL:
    BATCHSIZE: 100
    TIMEOUT: 3000
    
  2. 加载 JSON 格式流数据的配置文件 kafka_testdata_json.yaml
    DATABASE: yloms
    USER: gpss_usr
    PASSWORD: gpss_usr
    HOST: mdw
    PORT: 5432
    VERSION: 2
    KAFKA:
    INPUT:
    SOURCE:
       BROKERS: 192.168.12.115:7776
       TOPIC: gpss_test
    VALUE:
      COLUMNS:
         - NAME: jdata
           TYPE: json
      FORMAT: json
    ERROR_LIMIT: 25
    OUTPUT:
    SCHEMA: ylorder
    TABLE: test_heap
    MAPPING:
      - NAME: tid
        EXPRESSION: (jdata->>'tid')::int
      - NAME: tcode
        EXPRESSION: (jdata->>'tcode')::varchar
      - NAME: tname
        EXPRESSION: (jdata->>'tname')::varchar
    METADATA:
    SCHEMA: ylorder
    COMMIT:
    MINIMAL_INTERVAL: 2000
    POLL:
    BATCHSIZE: 100
    TIMEOUT: 3000
    

4.2.启动作业

step 1.用 gpss 做 etl 加载:
启动 gpss 服务:

gpss gpss4ic.json

日志输出

20200225:22:08:21 gpss:gpadmin:greenplum-001:010656-[INFO]:-using config file: gpss4ic.json
20200225:22:08:21 gpss:gpadmin:greenplum-001:010656-[INFO]:-config file content: {
        "ListenAddress": {
                "Host": "mdw",
                "Port": 50007,
                "Certificate": {
                        "CertFile": "",
                        "KeyFile": "",
                        "CAFile": ""
                }
        },
        "Gpfdist": {
                "Host": "mdw",
                "Port": 8319,
                "ReuseTables": false,
                "Certificate": {
                        "CertFile": "",
                        "KeyFile": "",
                        "CAFile": ""
                },
                "BindAddress": "0.0.0.0"
        }
}
20200225:22:08:21 gpss:gpadmin:greenplum-001:010656-[INFO]:-gpss-listen-address-prefix: mdw:50007
20200225:22:08:21 gpss:gpadmin:greenplum-001:010656-[INFO]:-gpss will use random external table name, external table won't get reused
20200225:22:08:21 gpss:gpadmin:greenplum-001:010656-[INFO]:-gpfdist listening on 0.0.0.0:8319
20200225:22:08:21 gpss:gpadmin:greenplum-001:010656-[INFO]:-gpss listening on mdw:50007

step 2.提交一个作业:

gpsscli submit --name kafkajson2gp --gpss-port 50007 --gpss-host mdw ./kafka_testdata_json.yaml

输出如下

20200225:22:09:16 gpsscli:gpadmin:greenplum-001:010722-[INFO]:-JobID: kafkajson2gp

step 3.查看作业列表:

gpsscli list --all --gpss-port 50007 --gpss-host mdw

输出如下

JobID                               GPHost          GPPort  DataBase        Schema          Table                           Topic           Status
kafkajson2gp                        mdw             5432    yloms           ylorder         test_heap                       gpss_test       JOB_STOPPED

step 4.启动作业:

gpsscli start kafkajson2gp --gpss-port 50007 --gpss-host mdw

输出如下

20200225:22:10:24 gpsscli:gpadmin:greenplum-001:010756-[INFO]:-JobID: kafkajson2gp is started

step 5.查看作业:

gpsscli list --all --gpss-port 50007 --gpss-host mdw

输出如下

JobID                               GPHost          GPPort  DataBase        Schema          Table                           Topic           Status
kafkajson2gp                        mdw             5432    yloms           ylorder         test_heap                       gpss_test       JOB_RUNNING

step 6.停掉作业:

gpsscli stop kafkajson2gp --gpss-port 50007 --gpss-host mdw

输出如下

20200225:22:11:04 gpsscli:gpadmin:greenplum-001:010801-[INFO]:-Stop a job: kafkajson2gp, status JOB_STOPPED

4.3.用 gpkafka 启动服务:

gpkafka load 可以理解为代替了 gpsscli 上的提交作业,启动作业等命令。

gpkafka --config gpss4ic.json load kafka_testdata_json.yaml

查看数据库保存的偏移量

select * from kafka_test.gpkafka_data_from_kafka_12ead185469b45cc8e5be3c9f0ea14a2 limit 10;

5.实时数据同步

5.1.maxwell + Kafka + bireme

采用 maxwell + Kafka + bireme,将MySQL数据实时同步至Greenplum。
maxwell实时解析MySQL的binlog,并将输出的JSON格式数据发送到Kafka,Kafka在此方案中主要用于消息中转,bireme负责读取Kafka的消息,并应用于Greenplum数据库以增量同步数据。


maxwell是一个能实时读取MySQL二进制日志binlog,并生成JSON 格式的消息,作为生产者发送给Kafka、Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其他平台的应用程序,其中Kafka是maxwell支持最完善的一个消息系统。它的常见应用场景有ETL、维护缓存、收集表级别的dml指标、增量到搜索引擎、数据分区迁移、切库binlog回滚方案等。

maswell在GitHub上具有较高的活跃度,官网地址为地址为https://github.com/zendesk/maxwell。

maxwell主要提供了下列功能:

  1. 支持 SELECT * FROM table 方式进行全量数据初始化。
  2. 支持GTID,当MySQL发生failover后,自动恢复binlog位置。
  3. 可以对数据进行分区,解决数据倾斜问题,发送到Kafka的数据支持database、table、column等级别的数据分区。
  4. 工作方式是伪装为MySQL Slave,在主库上创建dump线程连接,接收binlog事件,然后根据schemas信息拼装成JSON字符串,可以接受ddl、xid、row等各种事件。

bireme是一个Greenplum数据仓库的增量同步工具,目前支持MySQL、PostgreSQL和MongoDB数据源,maxwell + Kafka 是一种支持的数据源类型。
bireme作为Kafka的消费者,采用 DELETE + COPY 的方式,将数据源的修改记录同步到Greenplum,相较于INSERT、UPDATE、DELETE方式,COPY方式速度更快,性能更优。bireme的主要特性是采用小批量加载方式(默认加载延迟时间为10秒钟)提升数据同步的性能,但要求所有同步表在源和目标数据库中都必须有主键。bireme 官网地址为https://github.com/HashDataInc/bireme/。

Kafka在本架构中作为消息中间件将maxwell和bireme桥接在一起,上下游组件的实现都依赖于它。

5.2.Canal + Kafka + ClientAdapter


Canal是阿里开源的一个的组件,无论功能还是实现上都与maxwell类似。其主要用途是基于MySQL数据库增量日志解析,提供增量数据订阅和消费,工作原理相对比较简单:

  1. Canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议。
  2. MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 Canal )。
  3. Canal 解析 binary log 对象(原始字节流)。

Canal Server代表一个Canal运行实例,对应于一个jvm。Instance对应于一个数据队列,1个Server对应1..n个Instance。Instance模块中,EventParser完成数据源接入,模拟slave与master进行交互并解析协议。EventSink是Parser和Store的连接器,进行数据过滤、加工与分发。EventStore负责存储数据。MetaManager是增量订阅与消费信息管理器。


Canal 1.1.1版本之后默认支持将Canal Server接收到的binlog数据直接投递到消息队列,目前默认支持的消息系统有Kafka和RocketMQ。早期的Canal仅提供Client API,需要用户自己编写客户端程序实现消费逻辑。Canal 1.1.1版本之后增加了client-adapter,提供客户端数据落地的适配及启动功能。

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论