暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

基于Netty的日志采集中心设计与实现

Sumslack团队 2018-07-17
1076

需求概述

本文我们将开发一款日志采集中心,将客户端的日志源源不断的输送到统一的日志中心做进一步的处理,如日志统计,报表图形展现等,需求如下:

  • 日志采集客户端尽量保持简单,无侵入;

  • 基于Socket通信;

  • 服务端收到的日志统一publish
    到某个频道(可以是MQ或Redis等),需要使用日志的消费者程序通过订阅相应频道获取到客户端日志;

  • 报表统计分析(不在本文讨论范围)

Netty概述

Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序

开发一款稳定可靠性高的的基于socket的服务端程序,是一件不容易的事情,团灭市面上85%以上的程序员,你需要了解epoll模型,多线程,数据序列化,处理分包合包,定义包头包体,发送心跳,处理自动重连等,不过有了Netty
,这些工作将变得及其简单;

服务端实现思路

  1. 定义一个MQManager
    单例类,处理MQ的消息,将Netty服务端采集到的日志发往MQ Topic
    ,这样消费者程序通过订阅该Topic即可获取到客户端日志做统计分析(一对多的消费),程序如下:


 1//发送消息
2Message msg = new Message(topicName,   
3                tags,   
4                keys,   
5                body.getBytes());  
6SendResult result = [DefaultMQProducer].send(msg);
7//订阅消息
8consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
9consumer.setMessageModel(MessageModel.BROADCASTING);
10consumer.subscribe("default""*");
11consumer.registerMessageListener(new MessageListenerConcurrently() {
12      @Override
13      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
14                 ConsumeConcurrentlyContext context)
 
{
15            ......
16              return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
17      }
18});
19consumer.start();

 2. 启动Netty
服务端new ProtocolServer(port).run();
,服务端需要处理:

  • 每3秒自动检测是否需要启动;

  • 定义编解码;

  • 收发消息处理;

  • 客户端session维护;

  • 可扩展:通过包头的action字段值确定接收到的消息处理方式,工厂模式,可以是写入文件,或publish到MQ,或publish到Redis等;
    以发送到MQ为例,如下:

编码我们采用包头+包体的形式,其中包头第一个写入一个魔数,继而写入消息类型,消息行为(决定采用哪种方式处理日志消息),消息序列号,包体长度等,最后写入包体,如下:

服务端的总体框架代码如下:


3. 最后实现一个Log4j的Appender,给客户端log4j使用:


 1@Plugin(name="nettyIOAppender", category="Core", elementType="appender", printObject=true)
2public final class NettyIOAppender extends AbstractAppender{
3@Override
4    public void append(LogEvent event) {
5        readLock.lock();
6        try {
7            final byte[] bytes = getLayout().toByteArray(event);
8            //logger.debug(StringUtils.repeat("-", 30) + StringUtils.center(s_appName, 12) + new String(bytes));
9            //为了节省代码,只有json格式的字符串才发往日志中心
10            Object body = new String(bytes).trim();
11            if(nettyIOTask!=null){
12                JSONObject json  =new JSONObject();
13                json.put("app", s_appName);
14                json.put("lvl", event.getLevel().name());
15                json.put("name", event.getLoggerName());
16                json.put("msg", event.getMessage());
17                json.put("time", event.getTimeMillis());
18                nettyIOTask.getClient().sendMessage(StrUtil.formatNullStr(s_type).equals("mq")?ProtocolHeader.Action_Client.Log2ServerMQ:ProtocolHeader.Action_Client.Log2ServerPrint, json.toJSONString());
19            }
20        } catch (Exception ex) {
21            if (!ignoreExceptions()) {
22                throw new AppenderLoggingException(ex);
23            }
24        } finally {
25            readLock.unlock();
26        }
27    }
28    @PluginFactory
29    public static NettyIOAppender createAppender(
30            @PluginAttribute("name"String name,
31            @PluginElement("Layout") Layout<? extends Serializable> layout,
32            @PluginElement("Filter"final Filter filter,
33            @PluginAttribute("stype"String stype,
34            @PluginAttribute("host"String host,
35            @PluginAttribute("app-name"String appName) {
36
37        if (name == null) {
38            LOGGER.error("No name provided for MyCustomAppenderImpl");
39            return null;
40        }
41        s_type = stype;
42        s_host = host;
43        s_appName = appName;
44        /**连接Netty.io***/
45        Executor executor = Executors.newFixedThreadPool(1);
46        System.out.println("************netty server:" + s_host + ",appName="+s_appName+",s_type="+s_type);
47        nettyIOTask = new NettyIOTask(s_host,s_appName);
48        executor.execute(nettyIOTask);
49
50        if (layout == null) {
51            layout = PatternLayout.createDefaultLayout();
52        }
53        return new NettyIOAppender(name, filter, layout, true);
54    }
55}


客户端log4j配置

客户端的使用那就是逆天的简单了,只需两步:

  • 引入刚才服务端代码common模块,内含编解码和网络通信,只需要在pom.xml
    文件中加入:

1<dependency>
2    <groupId>com.sumslack</groupId>
3    <artifactId>logger.monitor.common</artifactId>
4    <version>1.0.0</version>
5</dependency>

  • log4j2.xml
    文件中加入:

 1<?xml version="1.0" encoding="UTF-8"?>
2<Configuration status="DEBUG" packages="com.sumscope.tag.logging">
3  <Appenders>
4    <Console name="Console" target="SYSTEM_OUT">
5      <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
6    </Console>
7     <NettyIOAppender name="nettyIOAppender" stype="mq" host="122.227.138.62:23105" app-name="xxxx"></NettyIOAppender>
8  </Appenders>
9  <Loggers>
10    <Root level="DEBUG">
11      <AppenderRef ref="Console"/>
12    </Root>
13    <Root level="INFO">
14      <AppenderRef ref="nettyIOAppender"/>
15    </Root>
16  </Loggers>
17</Configuration>


长按识别二维码关注我们


文章转载自Sumslack团队,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论