“前端埋点数据的采集系列,一共6次分享来实现这个需求交付解决方案。
一、采集组件集群规划
根据前面的采集系统整体架构设计,埋点日志系统组件集群规划如下表所示:
| Node01 | Node02 | Node03 | |
| Flume(采集日志) | √ | √ | |
| Kafka | √ | √ | √ |
| Flume消费Kafka | √ |
Node01和02做日志采集组件工作;
Node03的Flume做消费Kafka信息;
Kafka在3台节点都做存储信息;

二、Flume集群搭建
1、安装flume组件
官网下载flume组件
上传安装包到linux集群上 解压tar.gz到/opt/module/flume下面
删除lib文件夹下的guava-11.0.2.jar
修改flume-env.sh.template
mv flume-env.sh.template flume-env.sh# Enviroment variables can be set here.#JAVA_HOME路径export JAVA_HOME=/opt/module/jdk1.8.0_14
三、Flume选型
官方网站Flume的数据流模型示意图:

Source类型 | 特点 |
2、Channel的选择
由于在Channel这里,我们选择使用Kafka channel,则Sink可以不用选择,节省了数据流模块,提高了效率。
四、Flume拦截器实现
选型好的flume数据流如下图

1.日志文件格式
新建taildir-kafkachannel-kafka.conf文件
vim taildir-kafkachannel-kafka.conf
(1)配置source
配置拦截器类:会在Java代码中使用
#flume配置信息#Source:Taildir Source#channel:kafka channel#sink没有,直接写入到kafkaa1.sources = r1a1.channels = c1a1.sources.r1.type = TAILDIRa1.sources.r1.channels = c1a1.sources.r1.positionFile = opt/module/flume-1.9.0/taildir_position.jsona1.sources.r1.filegroups = f1#/opt/module/applog/log/app.2021-04-30.loga1.sources.r1.filegroups.f1 = opt/module/applog/log/app.*#配置自定义的拦截器a1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type= com.wqs.flume.interceptor.ETLInterceptor$Builder
(2)配置channel
#描述channela1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel#flume采集是node01、02没有03,03后面做消费kafka.a1.channels.c1.kafka.bootstrap.servers = node01:9092,node02:9092#创建kafka的topic名字叫topic_loga1.channels.c1.kafka.topic = topic_loga1.channels.c1.parseAsFlumeEvent = false
(1)创建Maven工程
POM文件主要涉及
flueme核心包
fastJSON
插件打包工具maven-plugin
<dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version><!--定义范围为只在编译使用--><scope>provided</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency>
(2)创建JSONUtils类检查JSON字符串合法性
/*** 验证数据是否是JSON* @param log* @return*/public static boolean isJSONValidate(String log){//加上异常处理,正常解析返回true,异常抛出try{JSON.parse(log);return true;}catch (JSONException e){return false;}}
(2)创建LogInterceptor类清理过滤前端日志
LogInterceptor类实现Interceptor接口,实现其Intercept方法,这个方法里面做解析过滤日志信息。
单Event里面调用JSONUtils类
根据isJSONValidate<>判断标准JSON
public Event intercept(Event event){//获取Body内容byte[] body = event.getBody();//body转化为String,且编码是UTF-8String log = new String(body, StandardCharsets.UTF_8);//调用JSONUtils类的isValidate方法对其log校验if(JSONUtils.isJSONValidate(log)){//标准JSON,返回eventreturn event;}else{//不是标准JSON,就清理脏数据,在哪里可以清理?集合里面清理return null;}}
List< Event>里面清理脏数据remove
public List<Event> intercept(List<Event> list){//使用迭代器来清理非标准JSONIterator iterator = list.iterator();while (iterator.hasNext()){//取出下一个eventEvent next = (Event) iterator.next();//调用单个event方法,返回null说明是非标准JSONif(intercept(next) == null){//如果为空,则表示它是非标准JSON去除它removeiterator.remove();}}return list;}

注意:由于我们是需要拿到linux集群上面flume的lib包下,需要拿带依赖的包。
五、Flume集群启动停止
1.上传到linux集群并且分发到node01、02
2.启动Flume集群
前台启动
bin/flume-ng agent --name a1 --conf-file conf/taildir-kafkachannel-kafka.conf
后台启动
bin/flume-ng agent --name a1 --conf-file conf/taildir-kafkachannel-kafka.conf &

添加 nohup不挂断进程
no hup bin/flume-ng agent --name a1 --conf-file conf/taildir-kafkachannel-kafka.conf &
3.关闭Flume集群

4.编写Flume集群的启动和关闭脚本
启动和关闭的时候我们不想一台一台敲,我们想通过脚本完成两台flume集群的开关闭。
#function:flume Cluster 采集log的 start and stop#!bin/bashcase $1 in"start"){for node in node01 node02doecho "===============启动 $node 采集flume ==============="ssh $node "nohup opt/module/flume-1.9.0/bin/flume-ng agent --name a1 --conf-file opt/module/flume-1.9.0/conf/taildir-kafkachannel-kafka.conf --name a1 Dflume.root.logger=INFO,LOGFILE 1>/opt/module/flume-1.9.0/log1.txt 2>&1 &"done};;"stop"){for node in node01 node02doecho "===============停止 $node 采集flume ==============="ssh $node "ps -ef | grep taildir-kafkachannel-kafka | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9 "done};;esac
埋点日志的采集集群规划 Flume集群搭建和配置 Flume组件Source、Channel、Sink选型 Flume拦截器编写步骤 Flume集群启动和关闭
>>>>
Q&A
Caused by:cdjava.lang.ClassNotFoundException: com.google.common.collect.Listsat java.net.URLClassLoader.findClass(URLClassLoader.java:382)at java.lang.ClassLoader.loadClass(ClassLoader.java:424)at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)at java.lang.ClassLoader.loadClass(ClassLoader.java:357)... 1 more
vim /etc/profile.d/my_env.sh#HADOOP_HOMEexport HADOOP_HOME=/opt/module/hadoop-3.1.4export PATH=$PATH:$HADOOP_HOME/binexport PATH=$PATH:$HADOOP_HOME/sbin
Q2:在flume1.7版本前,为什么kafka channel很少会有人使用?

A3:
(1)先获取flume启动的进程
(2)在过滤自身启动进程
(3)在对字符串进行切割,默认分隔符为空格,取出第二个字符串即进程编号

(4)取到进程后最后“反杀”这个进程 xargs -n1 kill -9




