暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Kafka TO Doris 保姆级入门详解

大数据技能圈 2024-10-26
78

  导读   本文主要分享如何快速将Kafka数据接入至Doris。

全文目录:

  1. 环境信息

  2. Kafka介绍

  3. Kafka安装部署

  4. Routine Load介绍

  5. Routine Load体验

  6. Routine Load常见问题


环境信息

1. 硬件信息

  • CPU:4C

  • CPU架构:ARM

  • 内存:8G

  • 硬盘:66G SSD

2. 软件信息

  • VM镜像版本:CentOS-7

  • Apache Doris版本:2.0.2-rc05

  • Apache Kafka版本:3.2.0

Kafka介绍

Apache Kafka 是一个高效、可扩展的、高吞吐的、可容错的分布式发布订阅式的消息系统,能够将消息数据从一个端点传递到另一个端点,较之传统的消息中间件(例如 RocketMQ、RabbitMQ),Kafka 具有高吞吐量、内置分区、支持消息副本和高容错的特性,非常适合大规模消息数据处理。

Kafka安装部署

1. Kafka下载

    #根据自己scala版本和系统进行下载
    wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0.tgz


    #创建安装文件夹
    mkdir -p opt/kafka3.2
    cd /opt/kafka3.2


    #解压安装
    tar -xvf kafka_2.12-3.2.0.tgz
    mv kafka_2.12-3.2.0.tgz/* ./
    rm -rf kafka_2.12-3.2.0.tgz*


    #创建日志目录
    mkdir logs

    2. Kafka初始化

    修改kafka-server配置。

      #修改kafka-server的配置文件
      vim config/server.properties


      #修改如下
      log.dirs=/opt/kafka3.2/logs
      listeners=PLAINTEXT://doris:9092
      auto.create.topics.enable=true


      #其它的如果是单机可以不用改
      port=9092 #端口号
      host.name=localhost #单机可直接用localhost
      log.dirs=/opt/monitor/kafka/kafka_dat #日志存放路径可修改可不修改
      zookeeper.connect=localhost:2181 #zookeeper地址和端口,单机配置部署,localhost:2181 

      修改自带zookeeper配置,也可以自己另外部署zk不适用自带的zk

        vim config/zookeeper.properties 


        #修改如下
        tickTime=2000
        dataDir=/opt/kafka3.2/zookeeper_data


        #创建zk的数据存储目录
        mkdir /opt/kafka3.2/zookeeper_data

        3. 启动Kafka和ZK

          #启动ZK
          ./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties
          #启动Kafka
          ./bin/kafka-server-start.sh -daemon ./config/server.properties


          #jps查看进程
          jps

          4. 服务测试

          ① Topic测试。

            #创建topic,使用 kafka-topics.sh 创建单分区单副本的 topic test01
            ./bin/kafka-topics.sh --create --bootstrap-server doris:9092 --replication-factor 1 --partitions 1 --topic test01


            #查询topic列表
            ./bin/kafka-topics.sh --list --bootstrap-server doris:9092
            #指定查看
            ./bin/kafka-topics.sh --bootstrap-server doris:9092 --describe --topic test01


            #删除topic
            ./bin/kafka-topics.sh --bootstrap-server doris:9092 --delete --topic test01

            ② Producer测试。

              #开一个窗,启动生产者
              ./bin/kafka-console-producer.sh --broker-list doris:9092 --topic test01

              ③ Consumer测试。

                #开一个窗,启动消费者
                #旧版本
                ./bin/kafka-console-consumer.sh --bootstrap-server doris:9092 --topic test01 --from-beginning
                #新版本
                ./bin/kafka-console-consumer.sh --bootstrap-server doris:9092 --topic test01 --from-beginning


                #查看kafka生产最大位置偏移量
                ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list doris:9092 --topic test01 --time -1

                Routine Load介绍

                Routine Load适合Kafka直接实时写数据到Doris的场景;它支持用户提交一个常驻的导入任务,通过不断地从指定的数据源中读取数据,将数据导入到 Doris 中。

                场景说明:

                ① Kafka To Doris可支持单表或多表导入

                Kafka数据直接同步至Doris对应表中,并可进行where数据筛选

                ③ 不适合硬删除的源数据,如果是硬删除建议转一次Flink改为软删除入Doris或其它方式处理

                Routine Load体验

                1. 创建Doris结果测试表

                  -- 创建测试库
                  create database routine_load;


                  -- 切换为测试库
                  use routine_load;


                  -- 创建测试结果表
                  CREATE TABLE rl_test01 (
                  `id` varchar(1000) NULL COMMENT "来源库表键",
                  `test01` BIGINT SUM DEFAULT "0" COMMENT "测试"
                  ) ENGINE=OLAP
                  AGGREGATE KEY(`id`)
                  DISTRIBUTED BY HASH(`id`) BUCKETS 1
                  PROPERTIES (
                  "replication_allocation" = "tag.location.default: 1",
                  "in_memory" = "false",
                  "storage_format" = "V2"
                  );

                  2. 创建Routine Load任务

                    CREATE ROUTINE LOAD routine_load.rl_test01 -- db.任务名,任务名可自定义
                    ON rl_test01 -- 与表名同名
                    COLUMNS TERMINATED BY ",", -- 默认空格
                    COLUMNS(id,test01) -- 字段名和表里对应
                    PROPERTIES
                    (
                    "desired_concurrent_number"="3",
                    "max_batch_interval" = "20",
                    "max_batch_rows" = "200000",
                    "max_batch_size" = "209715200",
                    "strict_mode" = "true", -- 默认为false,建议开启;开启后如果有shcema质量问题会在SHOW ROUTINE LOAD中的ErrorLogUrls中输出详情URL
                    "format" = "json" -- 默认为csv
                    )
                    FROM KAFKA
                    (
                    "kafka_broker_list" = "192.168.1.61:9092",
                    "kafka_topic" = "rl_test01", -- 对应的topic名
                    "property.group.id" = "rl_test01_group", -- 可自定义
                    "property.client.id" = "rl_test01_client", -- 可自定义
                    "property.kafka_default_offsets" = "OFFSET_BEGINNING" -- 两个可选参数;OFFSET_BEGINNING: 从有数据的位置开始订阅;OFFSET_END: 从末尾开始订阅
                            );

                    3. 查看Routine Load

                      SHOW ROUTINE LOAD


                      -- 有以下4种State:
                      -- * NEED_SCHEDULE:作业等待被调度
                      -- * RUNNING:作业运行中
                      -- * PAUSED:作业被暂停
                      -- * STOPPED:作业已结束
                      -- * CANCELLED:作业已取消
                      RESUME ROUTINE LOAD FOR rl_test01.rl_test01

                      4. 发送Kafka测试数据

                        ./bin/kafka-console-producer.sh --broker-list doris:9092 --topic rl_test01


                        # 测试数据如下

                        5. 查看Doris结果数据

                          select * from rl_test01
                          Routine Load常见问题

                          1. failed to get all partitions of kafka topic

                          异常详情:detailMessage = Failed to get all partitions of kafka topic: rl_test01

                          可能原因:

                          • 机房访问不了本地host

                          • kafka未提前设置自动创建topic,即topic不存在需要创建

                          2. current error rows is more than max error num 

                          异常详情:ErrorReason{code=errCode = 102, msg=‘current error rows is more than max error num’}

                          原因:

                          max_error_number:默认为0导致,即不允许有错误行

                          3. host resolution failure

                          be.INFO异常详情:kafka error: Local: Host resolution failure, event: GroupCoordinator: kafka:9092: Failed to resolve 'kafka:9092': Name or service not known (after 8ms in state CONNECT)

                          原因:

                          be节点中未配置kafka集群host导致;无论FE还是BE都需要与Kafka集群保证网络互通,如果使用了host,则be节点也需要在/etc/hosts中配置相应的host

                          至此,《Kafka TO Doris 保姆级入门详解》分享结束,查阅过程中若遇到问题欢迎留言交流。

                          往期推荐

                          大数据平台开发规范示例

                          Apache Doris 资源隔离详解

                          Apache Doris IP变更问题详解

                          ChatGPT快速入门

                          如何正确地使用ChatGPT(角色扮演+提示工程)

                          AIGC快速入门体验之虚拟对象

                          超强满血不收费的AI绘图教程来了(在线Stable Diffusion一键即用)

                          文章转载自大数据技能圈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                          评论