需求概述
本文我们将开发一款日志采集中心,将客户端的日志源源不断的输送到统一的日志中心做进一步的处理,如日志统计,报表图形展现等,需求如下:
日志采集客户端尽量保持简单,无侵入;
基于Socket通信;
服务端收到的日志统一
publish
到某个频道(可以是MQ或Redis等),需要使用日志的消费者程序通过订阅相应频道获取到客户端日志;报表统计分析(不在本文讨论范围)
Netty概述
Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序
开发一款稳定可靠性高的的基于socket的服务端程序,是一件不容易的事情,团灭市面上85%以上的程序员,你需要了解epoll模型,多线程,数据序列化,处理分包合包,定义包头包体,发送心跳,处理自动重连等,不过有了Netty
,这些工作将变得及其简单;
服务端实现思路
定义一个
MQManager
单例类,处理MQ的消息,将Netty服务端采集到的日志发往MQ Topic
,这样消费者程序通过订阅该Topic即可获取到客户端日志做统计分析(一对多的消费),程序如下:
1//发送消息
2Message msg = new Message(topicName,
3 tags,
4 keys,
5 body.getBytes());
6SendResult result = [DefaultMQProducer].send(msg);
7//订阅消息
8consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
9consumer.setMessageModel(MessageModel.BROADCASTING);
10consumer.subscribe("default", "*");
11consumer.registerMessageListener(new MessageListenerConcurrently() {
12 @Override
13 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
14 ConsumeConcurrentlyContext context) {
15 ......
16 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
17 }
18});
19consumer.start();
2. 启动Netty
服务端new ProtocolServer(port).run();
,服务端需要处理:
每3秒自动检测是否需要启动;
定义编解码;
收发消息处理;
客户端session维护;
可扩展:通过包头的action字段值确定接收到的消息处理方式,工厂模式,可以是写入文件,或publish到MQ,或publish到Redis等;
以发送到MQ为例,如下:

编码我们采用包头+包体的形式,其中包头第一个写入一个魔数,继而写入消息类型,消息行为(决定采用哪种方式处理日志消息),消息序列号,包体长度等,最后写入包体,如下:

服务端的总体框架代码如下:

3. 最后实现一个Log4j的Appender,给客户端log4j使用:
1@Plugin(name="nettyIOAppender", category="Core", elementType="appender", printObject=true)
2public final class NettyIOAppender extends AbstractAppender{
3@Override
4 public void append(LogEvent event) {
5 readLock.lock();
6 try {
7 final byte[] bytes = getLayout().toByteArray(event);
8 //logger.debug(StringUtils.repeat("-", 30) + StringUtils.center(s_appName, 12) + new String(bytes));
9 //为了节省代码,只有json格式的字符串才发往日志中心
10 Object body = new String(bytes).trim();
11 if(nettyIOTask!=null){
12 JSONObject json =new JSONObject();
13 json.put("app", s_appName);
14 json.put("lvl", event.getLevel().name());
15 json.put("name", event.getLoggerName());
16 json.put("msg", event.getMessage());
17 json.put("time", event.getTimeMillis());
18 nettyIOTask.getClient().sendMessage(StrUtil.formatNullStr(s_type).equals("mq")?ProtocolHeader.Action_Client.Log2ServerMQ:ProtocolHeader.Action_Client.Log2ServerPrint, json.toJSONString());
19 }
20 } catch (Exception ex) {
21 if (!ignoreExceptions()) {
22 throw new AppenderLoggingException(ex);
23 }
24 } finally {
25 readLock.unlock();
26 }
27 }
28 @PluginFactory
29 public static NettyIOAppender createAppender(
30 @PluginAttribute("name") String name,
31 @PluginElement("Layout") Layout<? extends Serializable> layout,
32 @PluginElement("Filter") final Filter filter,
33 @PluginAttribute("stype") String stype,
34 @PluginAttribute("host") String host,
35 @PluginAttribute("app-name") String appName) {
36
37 if (name == null) {
38 LOGGER.error("No name provided for MyCustomAppenderImpl");
39 return null;
40 }
41 s_type = stype;
42 s_host = host;
43 s_appName = appName;
44 /**连接Netty.io***/
45 Executor executor = Executors.newFixedThreadPool(1);
46 System.out.println("************netty server:" + s_host + ",appName="+s_appName+",s_type="+s_type);
47 nettyIOTask = new NettyIOTask(s_host,s_appName);
48 executor.execute(nettyIOTask);
49
50 if (layout == null) {
51 layout = PatternLayout.createDefaultLayout();
52 }
53 return new NettyIOAppender(name, filter, layout, true);
54 }
55}
客户端log4j配置
客户端的使用那就是逆天的简单了,只需两步:
引入刚才服务端代码common模块,内含编解码和网络通信,只需要在
pom.xml
文件中加入:
1<dependency>
2 <groupId>com.sumslack</groupId>
3 <artifactId>logger.monitor.common</artifactId>
4 <version>1.0.0</version>
5</dependency>
在
log4j2.xml
文件中加入:
1<?xml version="1.0" encoding="UTF-8"?>
2<Configuration status="DEBUG" packages="com.sumscope.tag.logging">
3 <Appenders>
4 <Console name="Console" target="SYSTEM_OUT">
5 <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
6 </Console>
7 <NettyIOAppender name="nettyIOAppender" stype="mq" host="122.227.138.62:23105" app-name="xxxx"></NettyIOAppender>
8 </Appenders>
9 <Loggers>
10 <Root level="DEBUG">
11 <AppenderRef ref="Console"/>
12 </Root>
13 <Root level="INFO">
14 <AppenderRef ref="nettyIOAppender"/>
15 </Root>
16 </Loggers>
17</Configuration>
长按识别二维码关注我们





