暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

StreamSets实时采集MySQL数据到HBase

大数据真有意思 2020-08-01
362

点击关注上方“知了小巷”,

设为“置顶或星标”,第一时间送达干货。

本地HBase环境

    $ jps
    4082 Jps
    3556 NameNode
    3813 QuorumPeerMain
    3911 HMaster
    3642 DataNode
    3739 SecondaryNameNode
    3999 HRegionServer


    本地环境演示实例

    mysql环境

      $ docker ps
      CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
      09849f9303ae 1fdf3806e715 "/entrypoint.sh mysq…" 22 months ago Up About a minute (healthy) 0.0.0.0:3306->3306/tcp, 33060/tcp mysql

      mysql版本:8.0.12

      hbase环境版本:

      Apache Hadoop:hadoop-3.1.1

      Apache HBase:hbase-2.1.0

      Apache Phoenix:apache-phoenix-5.0.0-HBase-2.0-bin

      本地sdc环境

        $ docker run --restart on-failure -p 18630:18630 -d --name streamsets-dc streamsets/datacollector
        $ docker ps
        CONTAINER ID        IMAGE                      COMMAND                  CREATED             STATUS                       PORTS                               NAMES
        cd2d89509457        streamsets/datacollector   "/docker-entrypoint.…"   35 minutes ago      Up 35 minutes                0.0.0.0:18630->18630/tcp            streamsets-dc
        09849f9303ae        1fdf3806e715               "/entrypoint.sh mysq…"   22 months ago       Up About an hour (healthy)   0.0.0.0:3306->3306/tcp, 33060/tcp   mysql

        虽然docker很方便,但是在连接HBase时需要打通网络,由于本地HBase是localhost,访问有问题,所以放弃docker版sdc,改为本地解压版。

          [zk: localhost:2181(CONNECTED) 4] get hbase/master
          �master:16000 }��PBUF


          localhost�}�����.�}
          cZxid = 0x1a725
          ctime = Tue Jul 21 11:02:46 CST 2020
          mZxid = 0x1a725
          mtime = Tue Jul 21 11:02:46 CST 2020
          pZxid = 0x1a725
          cversion = 0
          dataVersion = 0
          aclVersion = 0
          ephemeralOwner = 0x1000069b14e0000
          dataLength = 57
          numChildren = 0

          SDC本地解压版:

            $ ./bin/streamsets dc
            Java 1.8 detected; adding $SDC_JAVA8_OPTS of "-XX:+UseConcMarkSweepGC -XX:+UseParNewGC -Djdk.nio.maxCachedBufferSize=262144" to $SDC_JAVA_OPTS
            Bypass activation because SDC contains only basic stage libraries.
            Logging initialized @2901ms to org.eclipse.jetty.util.log.Slf4jLog
            Running on URI : 'http://192.168.31.29:18630'

            文档地址:

            https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Getting_Started/GettingStarted_Title.html#concept_htw_ghg_jq

            sdc支持的HBase版本下文测试环境演示实例中可以看到。

            Phoenix与HBase服务的集成:

            只需要将Phoenix包解压后的phoenix-5.0.0-HBase-2.0-server.jarphoenix-core-5.0.0-HBase-2.0.jar两个jar包拷贝到hbase的lib目录下,修改hbase-site.xml,添加相关配置,重启hbase集群即可。

                <property>
              <name>phoenix.schema.isNamespaceMappingEnabled</name>
              <value>true</value>
              </property>
              <property>
              <name>phoenix.schema.mapSystemTablesToNamespace</name>
              <value>true</value>
              </property>

              为了方便通过sqlline.py访问phoenix,将hbase-site.xml复制一份到phoenix的bin目录下。

                $ ./bin/sqlline.py 
                Setting property: [incremental, false]
                Setting property: [isolation, TRANSACTION_READ_COMMITTED]
                issuing: !connect jdbc:phoenix: none none org.apache.phoenix.jdbc.PhoenixDriver
                Connecting to jdbc:phoenix:
                SLF4J: Class path contains multiple SLF4J bindings.
                SLF4J: Found binding in [jar:file:/Users/shaozhipeng/Development/apache-phoenix-5.0.0-HBase-2.0-bin/phoenix-5.0.0-HBase-2.0-client.jar!/org/slf4j/impl/StaticLoggerBinder.class]
                SLF4J: Found binding in [jar:file:/Users/shaozhipeng/Development/hadoop-3.1.1/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
                SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
                20/07/20 16:28:09 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                Connected to: Phoenix (version 5.0)
                Driver: PhoenixEmbeddedDriver (version 5.0)
                Autocommit status: true
                Transaction isolation: TRANSACTION_READ_COMMITTED
                Building list of tables and columns for tab-completion (set fastconnect to true to skip)...
                133/133 (100%) Done
                Done
                sqlline version 1.2.0
                0: jdbc:phoenix:> !tables
                +------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+-----------------+---------------+-+
                | TABLE_CAT  | TABLE_SCHEM  | TABLE_NAME  |  TABLE_TYPE   | REMARKS  | TYPE_NAME  | SELF_REFERENCING_COL_NAME  | REF_GENERATION  | INDEX_STATE  | IMMUTABLE_ROWS  | SALT_BUCKETS  | |
                +------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+-----------------+---------------+-+
                |            | SYSTEM       | CATALOG     | SYSTEM TABLE  |          |            |                            |                 |              | false           | null          | |
                |            | SYSTEM       | FUNCTION    | SYSTEM TABLE  |          |            |                            |                 |              | false           | null          | |
                |            | SYSTEM       | LOG         | SYSTEM TABLE  |          |            |                            |                 |              | true            | 32            | |
                |            | SYSTEM       | SEQUENCE    | SYSTEM TABLE  |          |            |                            |                 |              | false           | null          | |
                |            | SYSTEM       | STATS       | SYSTEM TABLE  |          |            |                            |                 |              | false           | null          | |
                +------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+-----------------+---------------+-+
                0: jdbc:phoenix:> 

                MySQL创建表user

                  CREATE TABLE `user` (
                  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
                  `user_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL DEFAULT '' COMMENT '用户名',
                  `update_time` datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
                  PRIMARY KEY (`id`)
                  ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

                  HBase创建命名空间和表

                    create_namespace 'ZLXX'
                    create 'ZLXX:USER', 'INFO'


                    hbase(main):001:0> create_namespace 'ZLXX'
                    Took 0.9431 seconds
                    hbase(main):002:0> create 'ZLXX:USER', 'INFO'
                    Created table ZLXX:USER
                    Took 1.4457 seconds
                    => Hbase::Table - ZLXX:USER

                    Phoenix创建schema和表映射

                      create schema ZLXX;
                      create table ZLXX.USER (
                      id varchar primary key,
                      info.id varchar,
                      info.user_name varchar,
                      info.update_time varchar
                      ) column_encoded_bytes=0;


                      0: jdbc:phoenix:> !tables
                      +------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+-----------------+---------+
                      | TABLE_CAT  | TABLE_SCHEM  | TABLE_NAME  |  TABLE_TYPE   | REMARKS  | TYPE_NAME  | SELF_REFERENCING_COL_NAME  | REF_GENERATION  | INDEX_STATE  | IMMUTABLE_ROWS  | SALT_BU |
                      +------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+-----------------+---------+
                      |            | SYSTEM       | CATALOG     | SYSTEM TABLE  |          |            |                            |                 |              | false           | null    |
                      |            | SYSTEM       | FUNCTION    | SYSTEM TABLE  |          |            |                            |                 |              | false           | null    |
                      |            | SYSTEM       | LOG         | SYSTEM TABLE  |          |            |                            |                 |              | true            | 32      |
                      |            | SYSTEM       | SEQUENCE    | SYSTEM TABLE  |          |            |                            |                 |              | false           | null    |
                      |            | SYSTEM       | STATS       | SYSTEM TABLE  |          |            |                            |                 |              | false           | null    |
                      |            | ZLXX         | USER        | TABLE         |          |            |                            |                 |              | false           | null    |
                      +------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+-----------------+---------+


                      SDC创建pipeline流水线

                      需要先安装JDBC和CDH的组件


                      选择Origin:JDBC Query Consumer和Destination HBase

                      简单完整pipeline如图

                      上图中,直接从UI管理界面上传SDC-MySQL JDBC驱动,根据提示重启SDC即可

                      上传成功后,可以在列表里面看到

                      MySQL版本是8.0.12,需要注意jar包版本

                      https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.12/mysql-connector-java-8.0.12.jar


                      主要的数据源配置和数据目标系统配置

                      JDBC-MySQL:

                      HBase-CDH6.3.0:

                      Validate成功之后,直接Start,运行后的界面:

                      简单演示一下,往MySQL test.user表中插入和更新数据

                        INSERT INTO `test`.`user`(`id`, `user_name`, `update_time`) VALUES (2, 'ZLXX_INSERT', SYSDATE());


                        Record Count (since last startup)上面显示数量大于1是因为查询SQL里面的更新时间是大于等于OFFSET,而且每10秒扫描一次,因此会不断被扫描到,SQL里面加上了最近五分钟的限制,因此重复次数不会太多,如果是大于OFFSET,有可能会导致数据丢失。

                        hbase端的数据

                          hbase(main):004:0> scan 'ZLXX:USER', {LIMIT=>5}
                          ROW COLUMN+CELL
                          1 column=INFO:ID, timestamp=1595384382829, value=1
                          1 column=INFO:UPDATE_TIME, timestamp=1595384382829, value=20180903190809
                          1 column=INFO:USER_NAME, timestamp=1595384382829, value=\xE9\x82\xB5\xE5\xBF\x97\xE9\xB9\x8F
                          1 column=INFO:_0, timestamp=1595384382829, value=
                          2 column=INFO:ID, timestamp=1595388092263, value=2
                          2 column=INFO:UPDATE_TIME, timestamp=1595388092263, value=20200722111640
                          2 column=INFO:USER_NAME, timestamp=1595388092263, value=ZLXX_INSERT
                          2 row(s)
                          Took 0.0598 seconds

                          Phoenix端查询

                            0: jdbc:phoenix:> select * from zlxx.user limit 5;
                            +-----+-----+--------------+-----------------+
                            | ID | ID | USER_NAME | UPDATE_TIME |
                            +-----+-----+--------------+-----------------+
                            | 1 | 1 | 邵志鹏 | 20180903190809 |
                            | 2 | 2 | ZLXX_INSERT | 20200722111640 |
                            +-----+-----+--------------+-----------------+
                            2 rows selected (0.134 seconds)

                            MySQL test.user更新操作

                              UPDATE `test`.`user` SET `user_name` = 'ZLXX_IN_UPDATE', `update_time` = SYSDATE() WHERE `id` = 2;

                              Phoenix和HBase查询

                                0: jdbc:phoenix:> select * from zlxx.user limit 5;
                                +-----+-----+-----------------+-----------------+
                                | ID | ID | USER_NAME | UPDATE_TIME |
                                +-----+-----+-----------------+-----------------+
                                | 1 | 1 | 邵志鹏 | 20180903190809 |
                                | 2 | 2 | ZLXX_IN_UPDATE | 20200722113033 |
                                +-----+-----+-----------------+-----------------+
                                2 rows selected (0.128 seconds)
                                  hbase(main):005:0> scan 'ZLXX:USER', {LIMIT=>5}
                                  ROW COLUMN+CELL
                                  1 column=INFO:ID, timestamp=1595384382829, value=1
                                  1 column=INFO:UPDATE_TIME, timestamp=1595384382829, value=20180903190809
                                  1 column=INFO:USER_NAME, timestamp=1595384382829, value=\xE9\x82\xB5\xE5\xBF\x97\xE9\xB9\x8F
                                  1 column=INFO:_0, timestamp=1595384382829, value=
                                  2 column=INFO:ID, timestamp=1595388662927, value=2
                                  2 column=INFO:UPDATE_TIME, timestamp=1595388662927, value=20200722113033
                                  2 column=INFO:USER_NAME, timestamp=1595388662927, value=ZLXX_IN_UPDATE
                                  2 row(s)
                                  Took 0.0761 seconds

                                  需要注意上面Phoenix映射表的时候,两个ID字段,命名需要注意。

                                  【本地环境SDC实时采集MySQL数据到HBase并映射Phoenix表查询、END】


                                  附CDH的HBase版本:


                                  不同CDH版本对应的hbase版本(重要)


                                  往期推荐:

                                  到底什么样的企业应该建设数据中台?

                                  数据中台到底是不是大数据的下一站?

                                  Phoenix Java API配置及使用总结

                                  Phoenix表映射

                                  Phoenix视图映射

                                  Kafka消息送达语义说明

                                  Kafka基础知识总结

                                  Hadoop YARN:ApplicationMaster向ResourceManager注册AM源码调试

                                  Apache Hadoop YARN:Client<-->ResourceManager源码解析

                                  Apache Hadoop YARN:Client<-->ResourceManager源码DEBUG

                                  Hadoop YARN:ApplicationMaster与ResourceManager交互源码解析

                                  Hive企业级调优

                                  HiveQL查询连续三天有销售记录的店铺

                                  HiveQL实战蚂蚁森林低碳用户排名分析:解法一

                                  HiveQL实战蚂蚁森林低碳用户排名分析:解法二

                                  HiveQL实战蚂蚁森林植物申领统计分析

                                  Hive-函数

                                  Hive-查询

                                  Hive-DML(Data Manipulation Language)数据操作语言

                                  Hive-DDL(Data Definition Language)数据定义

                                  Hive优化(整理版)

                                  Spark Core之Shuffle解析

                                  数据仓库开发规范

                                  喜欢就分享-点赞-在看吧,谢谢~~

                                  文章转载自大数据真有意思,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                  评论