暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

利用Flume 汇入数据到HBase:Flume-hbase-sink 使用方法详解

BigData社区 2019-08-22
545


一.HbaseSinks的三种序列化模式使用说明


    1.HBasesink--SimpleHbaseEventSerializer

如下是展示如何使用 HBasesink--SimpleHbaseEventSerializer:

    agenttest.channels = memoryChannel-1
    agenttest.sinks = hbaseSink-1
    agenttest.sinks.hbaseSink-1.type = org.apache.flume.sink.hbase.HBaseSink
    agenttest.sinks.hbaseSink-1.table = test_hbase_table //HBase表名
    agenttest.sinks.hbaseSink-1.columnFamily = familycolumn-1 //HBase表的列族名称
    agenttest.sinks.hbaseSink-1.serializer= org.apache.flume.sink.hbase.SimpleHbaseEventSerializer
    agenttest.sinks.hbaseSink-1.serializer.payloadColumn = columnname //HBase表的列族下的某个列名称
    agenttest.sinks.hbaseSink-1.channels = memoryChannel-1

    注:当指定存入到HBase表的某个列族的指定列column时,不能写成:

      agenttest.sinks.hbaseSink-1.columnName = columnname
      或者:
      agenttest.sinks.hbaseSink-1.column = columnname

          2.HBasesink--RegexHbaseEventSerializer

      如下是展示如何使用 HBasesink--RegexHbaseEventSerializer(使用正则匹配切割event,然后存入HBase表的多个列):

        agenttest.channels = memoryChannel-2
        agenttest.sinks = hbaseSink-2
        agenttest.sinks.hbaseSink-2.type = org.apache.flume.sink.hbase.HBaseSink
        agenttest.sinks.hbaseSink-2.table = test_hbase_table
        agenttest.sinks.hbaseSink-2.columnFamily = familycolumn-2
        agenttest.sinks.hbaseSink-2.serializer= org.apache.flume.sink.hbase.RegexHbaseEventSerializer
        // 比如我要对nginx日志做分割,然后按列存储HBase,正则匹配分成的列为: ([xxx] [yyy] [zzz] [nnn] ...) 这种格式, 所以用下面的正则:
        agent.sinks.hbaseSink-2.serializer.regex = \\[(.*?)\\]\\ \\[(.*?)\\]\\ \\[(.*?)\\]\\ \\[(.*?)\\]
        // 指定上面正则匹配到的数据对应的hbase的familycolumn-2 列族下的4个cloumn列名
        agent.sinks.hbaseSink-2.serializer.colNames = column-1,column-2,column-3,column-4
        #agent.sinks.hbaseSink-2.serializer.payloadColumn = test
        agenttest.sinks.hbaseSink-2.channels = memoryChannel-2


            3.AsyncHbaseSink--SimpleAsyncHbaseEventSerializer

        如下是展示如何使用 AsyncHBaseSink-SimpleAsyncHbaseEventSerializer: 

          agenttest.channels = memoryChannel-3
          agenttest.sinks = hbaseSink-3
          agenttest.sinks.hbaseSink-3.type = org.apache.flume.sink.hbase.AsyncHBaseSink
          agenttest.sinks.hbaseSink-3.table = test_hbase_table
          agenttest.sinks.hbaseSink-3.columnFamily = familycolumn-3
          agenttest.sinks.hbaseSink-3.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
          agenttest.sinks.hbaseSink-3.serializer.payloadColumn = columnname //HBase表的列族下的某个列名称
          agenttest.sinks.hbaseSink-3.channels = memoryChannel-3


          二.具体案例示例---利用flume+HBase构建大数据采集汇总系统


              1.利用SimpleHbaseEventSerializer序列化模式

          我们首先在HBase里面建立一个表mikeal-hbase-table,拥有familyclom1和familyclom2两个列族:

            hbase(main):102:0> create 'mikeal-hbase-table','familyclom1','familyclom2'
            0 row(s) in 1.2490 seconds
            => Hbase::Table - mikeal-hbase-table

            然后写一个flume的配置文件test-flume-into-hbase.conf:

              # 从文件读取实时消息,不做处理直接存储到Hbase
              agent.sources = logfile-source
              agent.channels = file-channel
              agent.sinks = hbase-sink


              # logfile-source配置
              agent.sources.logfile-source.type = exec
              agent.sources.logfile-source.command = tail -f data/flume-hbase-test/mkhbasetable/data/nginx.log
              agent.sources.logfile-source.checkperiodic = 50
              # 组合source和channel
              agent.sources.logfile-source.channels = file-channel


              # channel配置,使用本地file
              agent.channels.file-channel.type = file
              agent.channels.file-channel.checkpointDir = data/flume-hbase-test/checkpoint
              agent.channels.file-channel.dataDirs = data/flume-hbase-test/data


              # sink 配置为HBaseSink 和 SimpleHbaseEventSerializer
              agent.sinks.hbase-sink.type = org.apache.flume.sink.hbase.HBaseSink
              #HBase表名
              agent.sinks.hbase-sink.table = mikeal-hbase-table
              #HBase表的列族名称
              agent.sinks.hbase-sink.columnFamily = familyclom1
              agent.sinks.hbase-sink.serializer = org.apache.flume.sink.hbase.SimpleHbaseEventSerializer
              #HBase表的列族下的某个列名称
              agent.sinks.hbase-sink.serializer.payloadColumn = cloumn-1
              # 组合sink和channel
              agent.sinks.hbase-sink.channel = file-channel

              从配置文件可以看出,我们选择本地的/data/flume-hbase-test/mkhbasetable/data/nginx.log日志目录作为实时数据采集源,选择本地文件目录/data/flume-hbase-test/data作为channel,选择HBase为为sink(也就是数据流向写入HBase)。

              注意:提交 flume-ng 任务的用户,比如flume用户,必须要有/data/flume-hbase-test/mkhbasetable/data/nginx.log /data/flume-hbase-test/data 目录与文件的读写权限;也必须要有HBase的读写权限。

              启动Flume:

                bin/flume-ng agent --name agent --conf etc/flume/conf/agent/ --conf-file etc/flume/conf/agent/test-flume-into-hbase.conf -Dflume.root.logger=DEBUG,console

                在另外一个shell客户端,输入:

                  echo "nging-1" >> data/flume-hbase-test/mkhbasetable/data/nginx.log;
                  echo "nging-2" >> data/flume-hbase-test/mkhbasetable/data/nginx.log;


                  再查看mikeal-hbase-table表:

                  数据已经作为value插入到表里面。


                  2.利用SimpleAsyncHbaseEventSerializer序列化模式

                  为了示例清晰,先把mikeal-hbase-table表数据清空:

                    truncate 'mikeal-hbase-table'


                    然后写一个flume的配置文件test-flume-into-hbase-2.conf:

                      # 从文件读取实时消息,不做处理直接存储到Hbase
                      agent.sources = logfile-source
                      agent.channels = file-channel
                      agent.sinks = hbase-sink# logfile-source配置
                      agent.sources.logfile-source.type = exec
                      agent.sources.logfile-source.command = tail -f data/flume-hbase-test/mkhbasetable/data/nginx.log
                      agent.sources.logfile-source.checkperiodic = 50


                      # channel配置,使用本地file
                      agent.channels.file-channel.type = file
                      agent.channels.file-channel.checkpointDir = data/flume-hbase-test/checkpoint
                      agent.channels.file-channel.dataDirs = data/flume-hbase-test/data


                      # sink 配置为 Hbase
                      agent.sinks.hbase-sink.type = org.apache.flume.sink.hbase.AsyncHBaseSink
                      agent.sinks.hbase-sink.table = mikeal-hbase-table
                      agent.sinks.hbase-sink.columnFamily = familyclom1
                      agent.sinks.hbase-sink.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
                      agent.sinks.hbase-sink.serializer.payloadColumn = cloumn-1


                      # 组合source、sink和channel
                      agent.sources.logfile-source.channels = file-channel
                      agent.sinks.hbase-sink.channel = file-channel

                      启动Flume

                        bin/flume-ng agent --name agent --conf etc/flume/conf/agent/ --conf-file etc/flume/conf/agent/test-flume-into-hbase-2.conf -Dflume.root.logger=DEBUG,console


                        在另外一个shell客户端,输入:

                          echo "nging-1" >> data/flume-hbase-test/mkhbasetable/data/nginx.log;
                          echo "nging-two" >> data/flume-hbase-test/mkhbasetable/data/nginx.log;
                          echo "nging-three" >> data/flume-hbase-test/mkhbasetable/data/nginx.log;

                          再查看mikeal-hbase-table表:


                          3.利用RegexHbaseEventSerializer序列化模式

                          RegexHbaseEventSerializer可以使用正则匹配切割event,然后存入HBase表的多个列。因此,本文简单展示如何使用RegexHbaseEventSerializer对event进行切割然后存存入HBase的多个列。

                          为了示例清晰,先把mikeal-hbase-table表数据清空:

                            truncate 'mikeal-hbase-table'


                            然后写一个flume的配置文件test-flume-into-hbase-3.conf:

                              # 从文件读取实时消息,不做处理直接存储到Hbase
                              agent.sources = logfile-source
                              agent.channels = file-channel
                              agent.sinks = hbase-sink


                              # logfile-source配置
                              agent.sources.logfile-source.type = exec
                              agent.sources.logfile-source.command = tail -f data/flume-hbase-test/mkhbasetable/data/nginx.log
                              agent.sources.logfile-source.checkperiodic = 50


                              # channel配置,使用本地file
                              agent.channels.file-channel.type = file
                              agent.channels.file-channel.checkpointDir = data/flume-hbase-test/checkpoint
                              agent.channels.file-channel.dataDirs = /data/flume-hbase-test/data


                              # sink 配置为 Hbase
                              agent.sinks.hbase-sink.type = org.apache.flume.sink.hbase.HBaseSink
                              agent.sinks.hbase-sink.table = mikeal-hbase-table
                              agent.sinks.hbase-sink.columnFamily = familyclom1
                              agent.sinks.hbase-sink.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
                              # 比如我要对nginx日志做分割,然后按列存储HBase,正则匹配分成的列为: ([xxx] [yyy] [zzz] [nnn] ...) 这种格式, 所以用下面的正则:
                              agent.sinks.hbase-sink.serializer.regex = \\[(.*?)\\]\\ \\[(.*?)\\]\\ \\[(.*?)\\]
                              agent.sinks.hbase-sink.serializer.colNames = time,url,number


                              # 组合source、sink和channel
                              agent.sources.logfile-source.channels = file-channel
                              agent.sinks.hbase-sink.channel = file-channel

                              启动Flume:

                                bin/flume-ng agent --name agent --conf /etc/flume/conf/agent/ --conf-file /etc/flume/conf/agent/test-flume-into-hbase-3.conf -Dflume.root.logger=DEBUG,console

                                在另外一个shell客户端,输入:

                                  echo "[2016-12-22-19:59:59] [http://www.qq.com] [10]" >> /data/flume-hbase-test/mkhbasetable/data/nginx.log;
                                  echo "[2016-12-22 20:00:12] [http://qzone.qq.com] [19]" >> /data/flume-hbase-test/mkhbasetable/data/nginx.log;

                                  再查看mikeal-hbase-table表:



                                  以上就是今天的所有内容啦。希望能在你学习的路上帮到你,要是觉得还不错请识别以下二维码关注或转发吧,感谢支持!


                                  如果喜欢小编的内容,可以关注"BigData社区",

                                  加小编微信:876038581ljl


                                  推荐阅读:

                                  1. Hive和Hbase整合

                                  2. Hbase的完全分布式安装

                                  3. Hbase的API(增删改查)

                                  4. Hbase结构和基础操作









                                  文章转载自BigData社区,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                  评论