暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

前端埋点数据采集(三)Flume采集数据

畅谈Fintech 2021-06-25
1569

上一期“前端埋点数据采集(二)mock应用系统10万条前端埋点数据”,前端埋点日志数据已经生成到linux服务器集群了。

这期分享通过搭建Flume组件采集生成的日志数据,涉及Flume组件内容Source、channel、sink的选型以及flume拦截器的编写。




“前端埋点数据的采集系列,一共6次分享来实现这个需求交付解决方案。

一、采集系统架构设计
二、mock应用系统10万条前端埋点数据
三、Flume采集数据
四、Kafka生产消费数据
五.  Flume消费Kafka数据
六. 前端埋点数据采集总结






一、采集组件集群规划


1、Flume、Kafka组件集群规划

据前面的采集系统整体架构设计,埋点日志系统组件集群规划如下表所示:


Node01Node02

Node03

Flume(采集日志)     √     √
Kafka     √     √     √
Flume消费Kafka

     √
说明:
  • Node01和02做日志采集组件工作;

  • Node03的Flume做消费Kafka信息;

  • Kafka在3台节点都做存储信息;




二、Flume集群搭建


1、安装flume组件

  • 官网下载flume组件
        http://flume.apache.org/download.html
  • 上传安装包到linux集群上
  • 解压tar.gz到/opt/module/flume下面
2、部署和配置flume
  • 删除lib文件夹下的guava-11.0.2.jar

        guava这个包和Hadoop3.X不兼容。
  • 修改flume-env.sh.template

    mv flume-env.sh.template flume-env.sh
    # Enviroment variables can be set here.
    #JAVA_HOME路径
    export JAVA_HOME=/opt/module/jdk1.8.0_14




    三、Flume选型


    官方网站Flume的数据流模型示意图:



    1、Source的选择


    Source类型

    特点

    TailDir Source
    支持断点续传、多目录在1.7以上版本;
    1.6以下(含1.6) 版本,需要自定义Source记录每次读取文件位置,来实现断点续传。
    Exec Source
    可以实时搜集数据但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。
    Spooling 
    Directory  Source
    监控目录,支持断点续传。
    我们选择使用TailDir Source,flume1.9.0版本,支持断点续传功能。

    2、Channel的选择


    Channel类型
    特点
    File channel
    基于磁盘,断电数据保存,可靠性较高
    效率低,速度慢
    memory channel
    基于内存,断电数据丢失,可靠性较低
    效率高,速度快
    Kafka channel
    数据存储在Kafka里面,Kafka存储于磁盘,可靠性高
    等价于 memory channel  + Kafka sink
    选择使用Kafka channel基于以下两点:
    (1)Kafka存储于磁盘,可靠性高;
    (2)Flume下一级是Kafka,选择Kafka channel 可以省略Sink,提高效率。


    3、Sink的选择


    由于在Channel这里,我们选择使用Kafka channel,则Sink可以不用选择,节省了数据流模块,提高了效率。




    四、Flume拦截器实现

    选型好的flume数据流如下图


    1.日志文件格式

    前端产生的日志,使用flume组件就可以直接按照log的日志格式采集:
    app.yyyy-mm-dd.log
    2.为什么还需要flume拦截器呢?
    可不可以不配置拦截器呢?答案是可以不做拦截器,放到数仓里面做!
    但是我们为了规范采集日志的格式,首先需要对日志进行清理,前端送过来的是JSON格式数据,如果不满足JSON格式,我们丢弃该数据!
    3.配置flume的Source-Channel

    新建taildir-kafkachannel-kafka.conf文件

      vim taildir-kafkachannel-kafka.conf

      (1)配置source

      配置拦截器类:会在Java代码中使用

        #flume配置信息
        #Source:Taildir Source
        #channel:kafka channel
        #sink没有,直接写入到kafka
        a1.sources = r1
        a1.channels = c1
        a1.sources.r1.type = TAILDIR
        a1.sources.r1.channels = c1
        a1.sources.r1.positionFile = opt/module/flume-1.9.0/taildir_position.json
        a1.sources.r1.filegroups = f1


        #/opt/module/applog/log/app.2021-04-30.log
        a1.sources.r1.filegroups.f1 = opt/module/applog/log/app.*
        #配置自定义的拦截器
        a1.sources.r1.interceptors = i1
        a1.sources.r1.interceptors.i1.type= com.wqs.flume.interceptor.ETLInterceptor$Builder

        (2)配置channel

          #描述channel
          a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
          #flume采集是node01、02没有03,03后面做消费kafka.
          a1.channels.c1.kafka.bootstrap.servers = node01:9092,node02:9092
          #创建kafka的topic名字叫topic_log
          a1.channels.c1.kafka.topic = topic_log
          a1.channels.c1.parseAsFlumeEvent = false


          3.配置拦截器

          (1)创建Maven工程

          POM文件主要涉及

          • flueme核心包

          • fastJSON

          • 插件打包工具maven-plugin

            <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.9.0</version>
            <!--定义范围为只在编译使用-->
            <scope>provided</scope>
            </dependency>
            <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
            </dependency>

            (2)创建JSONUtils类检查JSON字符串合法性

              /**
              * 验证数据是否是JSON
              * @param log
              * @return
              */
              public static boolean isJSONValidate(String log)
              {
              //加上异常处理,正常解析返回true,异常抛出
              try
              {
              JSON.parse(log);
              return true;
              }catch (JSONException e)
              {
              return false;
              }
              }

              (2)创建LogInterceptor类清理过滤前端日志

              LogInterceptor类实现Interceptor接口,实现其Intercept方法,这个方法里面做解析过滤日志信息。

              • 单Event里面调用JSONUtils类

              • 根据isJSONValidate<>判断标准JSON

                public Event intercept(Event event)
                {
                //获取Body内容
                byte[] body = event.getBody();
                //body转化为String,且编码是UTF-8
                String log = new String(body, StandardCharsets.UTF_8);
                //调用JSONUtils类的isValidate方法对其log校验
                if(JSONUtils.isJSONValidate(log))
                {
                //标准JSON,返回event
                return event;
                }
                else
                {
                //不是标准JSON,就清理脏数据,在哪里可以清理?集合里面清理
                return null;
                }
                }
                • List< Event>里面清理脏数据remove

                  public List<Event> intercept(List<Event> list)
                  {
                  //使用迭代器来清理非标准JSON
                  Iterator iterator = list.iterator();
                  while (iterator.hasNext())
                  {
                  //取出下一个event
                  Event next = (Event) iterator.next();
                  //调用单个event方法,返回null说明是非标准JSON
                  if(intercept(next) == null)
                  {
                  //如果为空,则表示它是非标准JSON去除它remove
                  iterator.remove();
                  }
                  }
                  return list;
                  }
                  4.Maven打jar包

                  注意:由于我们是需要拿到linux集群上面flume的lib包下,需要拿带依赖的包。



                  五、Flume集群启动停止


                  1.上传到linux集群并且分发到node01、02

                  放到/opt/module/flume-1.9.0/lib

                  2.启动Flume集群

                  • 前台启动

                    bin/flume-ng agent --name a1 --conf-file conf/taildir-kafkachannel-kafka.conf
                    • 后台启动

                      bin/flume-ng agent --name a1 --conf-file conf/taildir-kafkachannel-kafka.conf &
                      发现不管是前台还是后台启动,当客户端关闭后都会丢失Application进程


                      • 添加 nohup不挂断进程

                        no hup bin/flume-ng agent --name a1 --conf-file conf/taildir-kafkachannel-kafka.conf &


                        3.关闭Flume集群

                        我们找到启动的进程,编号然后杀死进程编号实现关闭。

                        4.编写Flume集群的启动和关闭脚本

                        启动和关闭的时候我们不想一台一台敲,我们想通过脚本完成两台flume集群的开关闭

                           #function:flume Cluster 采集log的 start and stop
                          #!bin/bash
                          case $1 in
                          "start"){
                          for node in node01 node02
                          do
                          echo "===============启动 $node 采集flume ==============="
                          ssh $node "nohup opt/module/flume-1.9.0/bin/flume-ng agent --name a1 --conf-file opt/module/flume-1.9.0/conf/taildir-kafkachannel-kafka.conf --name a1 Dflume.root.logger=INFO,LOGFILE 1>/opt/module/flume-1.9.0/log1.txt 2>&1 &"
                          done


                          };;
                          "stop"){
                          for node in node01 node02
                          do
                          echo "===============停止 $node 采集flume ==============="
                          ssh $node "ps -ef | grep taildir-kafkachannel-kafka | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9 "
                          done
                          };;
                          esac                                                                                                                              





                          总结:
                          1. 埋点日志的采集集群规划
                          2. Flume集群搭建和配置
                          3. Flume组件Source、Channel、Sink选型
                          4. Flume拦截器编写步骤
                          5. Flume集群启动和关闭




                          >>>>

                          Q&A

                          Q1:删除flume下lib下的guava-11.02.jar,报错:
                            Caused by: 
                            cdjava.lang.ClassNotFoundException: com.google.common.collect.Lists
                            at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
                            at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
                            at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
                            at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
                                    ... 1 more
                            A1:报错说明找不到对应的类,说明没有配置HADOOP_HOME环境变量
                                     需要配置HADOOP_HOME环境变量
                              vim /etc/profile.d/my_env.sh
                              #HADOOP_HOME
                              export HADOOP_HOME=/opt/module/hadoop-3.1.4
                              export PATH=$PATH:$HADOOP_HOME/bin
                              export PATH=$PATH:$HADOOP_HOME/sbin

                              Q2:在flume1.7版本前,为什么kafka channel很少会有人使用?

                              A2:flume1.7版本前(比如flume1.6版本)parseAsFlumeEvent的值无论设置true 还是false,都会转变为Flume Event即包含header + content。这样的话,还需要在处理header把它拿掉,我们一般业务是只要content内容。而在flume1.7以上版本这个问题已经修正了。


                              Q3:杀死进程的脚本怎么理解?
                              ps -ef | grep taildir-kafkachannel-kafka | grep -v grep |awk  '{print \$2}' | xargs -n1 kill -9

                              A3:

                              (1)先获取flume启动的进程

                              ps -ef | grep taildir-kafkachannel-kafka

                              (2)在过滤自身启动进程

                              ps -ef | grep taildir-kafkachannel-kafka | grep -v grep

                              (3)在对字符串进行切割,默认分隔符为空格,取出第二个字符串即进程编号

                              ps -ef | grep taildir-kafkachannel-kafka | grep -v grep |awk  '{print \$2}'

                              (4)取到进程后最后“反杀”这个进程 xargs -n1 kill -9

                              ps -ef | grep taildir-kafkachannel-kafka | grep -v grep |awk  '{print \$2}' | xargs -n1 kill -9


                              文章转载自畅谈Fintech,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                              评论