暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

日志从消息队列到持久化数据库

北漂悟道之路 2019-06-30
711


本系列连载日志系统的相关文章均以日志到kafka再到持久化插件为架构模型作为前提,首先按照上图,应该有一个数据的流向。

在之前的章节讲过日志收集的原理,那么收集起来的日志,经过producer传至kafka之后,是如何存储到数据库的呢?  

熟悉kafka概念的同学应该了解,是通过消费者(consumer)进行传输的。

本篇主讲基于logstash和自研插件入ES数据库。

先简单介绍ES分别是什么。

Elasticsearch是一个基于Apache Lucene(TM)的开源搜索引擎。无论在开源还是专有领域,Lucene可以被认为是迄今为止最先进、性能最好的、功能最全的搜索引擎库(官网)。

可以通过简单的RESTful API、各种语言的客户端甚至命令行与之交互。


  • 分布式的实时文件存储,每个字段都被索引并可被搜索

  • 分布式的实时分析搜索引擎

  • 可以扩展到上百台服务器,处理PB级结构化或非结构化数据






  • Logstash传输

logstash的原理在之前的《日志系统的数据过滤组件-Logstash》有介绍,不在赘述,本章节主要介绍如何配置Logstash。


 1input {
2    kafka {
3        bootstrap_servers => "192.168.1.11:9092,192.168.1.12:9092,192.168.1.13:9092"
4        client_id => "test1"
5        topics => ["test1"]
6        type => "test1"
7        codec => 'json'
8    }
9}
10
11output {
12     elasticsearch {
13        #action => "index"
14        hosts => ["localhost:9200"]
15        index =>  "%{appname}-%{+YYYY.MM.dd}" #根据项目名称动态创建索引
16        template => "/home/elasticsearch-6.3.1/config/templates/logstash.json" 索引模板地址
17        manage_template => false #关闭logstash默认索引模板
18        template_name => "crawl" #映射模板的名字
19        template_overwrite => true #如果设置为true,模板名字一样的时候,新的模板会覆盖旧的模板
20     }
21}


可以尝试配置出多个output和多个input





  • 自研插件

可以尝试通过多进程和多线程实现消费者的功能,基本思路如下:

一个线程:负责获取kafka指定topic的数据,格式化处理后,导入进程间的Queue1中。

一个线程:启动一个进程池,各子进程通过获取进程间Queue1的数据,组装出待导入es的格式数据,导入进程间的Queue2中。

一个线程:负责读取Queue2中的数据导入ES。

主进程:负载3个线程的切换。

注:后续代码不是按这个思路来的,只是简单的实现kafka的读取和ES的导入,不涉及以上所讲模式。


感兴趣的同学可以实现,kafka的topic的切换,可配,还不影响性能。






  • 消费者从kafka到es传输的全部代码

  1#!/usr/bin/python
2# -*- coding: UTF-8 -*-
3from pykafka import KafkaClient
4import logging
5import logging.config
6from ConfigUtil import ConfigUtil
7import datetime
8
9class KafkaPython:
10    logging.config.fileConfig("logging.conf")
11    logger = logging.getLogger("msg")
12    logger_data = logging.getLogger("data")
13
14    def __init__(self):
15        self.server = ConfigUtil().get("kafka","kafka_server")
16        self.topic  = ConfigUtil().get("kafka","topic")
17        self.group = ConfigUtil().get("kafka","group")
18        self.partition_id = int(ConfigUtil().get("kafka","partition"))
19        self.consumer_timeout_ms = int(ConfigUtil().get("kafka","consumer_timeout_ms"))
20        self.consumer = None
21        self.hosts = ConfigUtil().get("es","hosts")
22        self.index_name = ConfigUtil().get("es","index_name")
23        self.type_name = ConfigUtil().get("es","type_name")
24
25
26    def getConnect(self):
27        client = KafkaClient(self.server)
28        topic = client.topics[self.topic]
29        p = topic.partitions
30        ps={p.get(self.partition_id)}
31
32        self.consumer = topic.get_simple_consumer(
33            consumer_group=self.group,
34            auto_commit_enable=True,
35            consumer_timeout_ms=self.consumer_timeout_ms,
36            # num_consumer_fetchers=1,
37            # consumer_id='test1',
38            partitions=ps
39            )
40        self.starttime = datetime.datetime.now()
41
42    def beginConsumer(self):
43        print("beginConsumer kafka-python")
44        imprtEsData = ImportEsData(self.hosts,self.index_name,self.type_name)
45        #创建ACTIONS  
46        count = 0
47        ACTIONS = [] 
48
49        while True:
50            endtime = datetime.datetime.now()
51            print (endtime - self.starttime).seconds
52            for message in self.consumer:
53                if message is not None:
54                    try:
55                        count = count + 1
56                        # print(str(message.partition.id)+","+str(message.offset)+","+str(count))
57                        # self.logger.info(str(message.partition.id)+","+str(message.offset)+","+str(count))
58                        action = {  
59                            "_index": self.index_name,  
60                            "_type": self.type_name,  
61                            "_source": message.value
62                        }
63                        ACTIONS.append(action)
64                        if len(ACTIONS) >= 10000:
65                            imprtEsData.set_date(ACTIONS)
66                            ACTIONS = []
67                            self.consumer.commit_offsets()
68                            endtime = datetime.datetime.now()
69                            print (endtime - self.starttime).seconds
70                            #break
71                    except (Exception) as e:
72                        # self.consumer.commit_offsets()
73                        print(e)
74                        self.logger.error(e)
75                        self.logger.error(str(message.partition.id)+","+str(message.offset)+","+message.value+"\n")
76                        # self.logger_data.error(message.value+"\n")
77                    # self.consumer.commit_offsets()
78
79            if len(ACTIONS) > 0:
80                self.logger.info("等待时间超过,consumer_timeout_ms,把集合数据插入es")
81                imprtEsData.set_date(ACTIONS)
82                ACTIONS = []
83                self.consumer.commit_offsets()
84
85    def disConnect(self):
86        self.consumer.close()
87
88
89from elasticsearch import Elasticsearch  
90from elasticsearch.helpers import bulk
91class ImportEsData:
92
93    logging.config.fileConfig("logging.conf")
94    logger = logging.getLogger("msg")
95
96    def __init__(self,hosts,index,type):
97       self.es = Elasticsearch(hosts=hosts.strip(',').split(','), timeout=5000)
98       self.index = index
99       self.type = type
100
101    def set_date(self,data):  
102        # 批量处理  
103        success = bulk(self.es, data, index=self.index, raise_on_error=True)  
104        self.logger.info(success)
105
106
107if __name__ == '__main__':
108    kp = KafkaPython()
109    kp.getConnect()
110    kp.beginConsumer()







微信号:XiaoJiaQingShi

希望有所帮助

欢迎多多关注


留言板


文章转载自北漂悟道之路,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论