
本系列连载日志系统的相关文章均以日志到kafka再到持久化插件为架构模型作为前提,首先按照上图,应该有一个数据的流向。
在之前的章节讲过日志收集的原理,那么收集起来的日志,经过producer传至kafka之后,是如何存储到数据库的呢?
熟悉kafka概念的同学应该了解,是通过消费者(consumer)进行传输的。
本篇主讲基于logstash和自研插件入ES数据库。

先简单介绍ES分别是什么。
Elasticsearch是一个基于Apache Lucene(TM)的开源搜索引擎。无论在开源还是专有领域,Lucene可以被认为是迄今为止最先进、性能最好的、功能最全的搜索引擎库(官网)。
可以通过简单的RESTful API、各种语言的客户端甚至命令行与之交互。
分布式的实时文件存储,每个字段都被索引并可被搜索
分布式的实时分析搜索引擎
可以扩展到上百台服务器,处理PB级结构化或非结构化数据

Logstash传输
logstash的原理在之前的《日志系统的数据过滤组件-Logstash》有介绍,不在赘述,本章节主要介绍如何配置Logstash。
1input {
2 kafka {
3 bootstrap_servers => "192.168.1.11:9092,192.168.1.12:9092,192.168.1.13:9092"
4 client_id => "test1"
5 topics => ["test1"]
6 type => "test1"
7 codec => 'json'
8 }
9}
10
11output {
12 elasticsearch {
13 #action => "index"
14 hosts => ["localhost:9200"]
15 index => "%{appname}-%{+YYYY.MM.dd}" #根据项目名称动态创建索引
16 template => "/home/elasticsearch-6.3.1/config/templates/logstash.json" 索引模板地址
17 manage_template => false #关闭logstash默认索引模板
18 template_name => "crawl" #映射模板的名字
19 template_overwrite => true #如果设置为true,模板名字一样的时候,新的模板会覆盖旧的模板
20 }
21}

自研插件

可以尝试通过多进程和多线程实现消费者的功能,基本思路如下:
一个线程:负责获取kafka指定topic的数据,格式化处理后,导入进程间的Queue1中。
一个线程:启动一个进程池,各子进程通过获取进程间Queue1的数据,组装出待导入es的格式数据,导入进程间的Queue2中。
一个线程:负责读取Queue2中的数据导入ES。
主进程:负载3个线程的切换。
注:后续代码不是按这个思路来的,只是简单的实现kafka的读取和ES的导入,不涉及以上所讲模式。

消费者从kafka到es传输的全部代码
1#!/usr/bin/python
2# -*- coding: UTF-8 -*-
3from pykafka import KafkaClient
4import logging
5import logging.config
6from ConfigUtil import ConfigUtil
7import datetime
8
9class KafkaPython:
10 logging.config.fileConfig("logging.conf")
11 logger = logging.getLogger("msg")
12 logger_data = logging.getLogger("data")
13
14 def __init__(self):
15 self.server = ConfigUtil().get("kafka","kafka_server")
16 self.topic = ConfigUtil().get("kafka","topic")
17 self.group = ConfigUtil().get("kafka","group")
18 self.partition_id = int(ConfigUtil().get("kafka","partition"))
19 self.consumer_timeout_ms = int(ConfigUtil().get("kafka","consumer_timeout_ms"))
20 self.consumer = None
21 self.hosts = ConfigUtil().get("es","hosts")
22 self.index_name = ConfigUtil().get("es","index_name")
23 self.type_name = ConfigUtil().get("es","type_name")
24
25
26 def getConnect(self):
27 client = KafkaClient(self.server)
28 topic = client.topics[self.topic]
29 p = topic.partitions
30 ps={p.get(self.partition_id)}
31
32 self.consumer = topic.get_simple_consumer(
33 consumer_group=self.group,
34 auto_commit_enable=True,
35 consumer_timeout_ms=self.consumer_timeout_ms,
36 # num_consumer_fetchers=1,
37 # consumer_id='test1',
38 partitions=ps
39 )
40 self.starttime = datetime.datetime.now()
41
42 def beginConsumer(self):
43 print("beginConsumer kafka-python")
44 imprtEsData = ImportEsData(self.hosts,self.index_name,self.type_name)
45 #创建ACTIONS
46 count = 0
47 ACTIONS = []
48
49 while True:
50 endtime = datetime.datetime.now()
51 print (endtime - self.starttime).seconds
52 for message in self.consumer:
53 if message is not None:
54 try:
55 count = count + 1
56 # print(str(message.partition.id)+","+str(message.offset)+","+str(count))
57 # self.logger.info(str(message.partition.id)+","+str(message.offset)+","+str(count))
58 action = {
59 "_index": self.index_name,
60 "_type": self.type_name,
61 "_source": message.value
62 }
63 ACTIONS.append(action)
64 if len(ACTIONS) >= 10000:
65 imprtEsData.set_date(ACTIONS)
66 ACTIONS = []
67 self.consumer.commit_offsets()
68 endtime = datetime.datetime.now()
69 print (endtime - self.starttime).seconds
70 #break
71 except (Exception) as e:
72 # self.consumer.commit_offsets()
73 print(e)
74 self.logger.error(e)
75 self.logger.error(str(message.partition.id)+","+str(message.offset)+","+message.value+"\n")
76 # self.logger_data.error(message.value+"\n")
77 # self.consumer.commit_offsets()
78
79 if len(ACTIONS) > 0:
80 self.logger.info("等待时间超过,consumer_timeout_ms,把集合数据插入es")
81 imprtEsData.set_date(ACTIONS)
82 ACTIONS = []
83 self.consumer.commit_offsets()
84
85 def disConnect(self):
86 self.consumer.close()
87
88
89from elasticsearch import Elasticsearch
90from elasticsearch.helpers import bulk
91class ImportEsData:
92
93 logging.config.fileConfig("logging.conf")
94 logger = logging.getLogger("msg")
95
96 def __init__(self,hosts,index,type):
97 self.es = Elasticsearch(hosts=hosts.strip(',').split(','), timeout=5000)
98 self.index = index
99 self.type = type
100
101 def set_date(self,data):
102 # 批量处理
103 success = bulk(self.es, data, index=self.index, raise_on_error=True)
104 self.logger.info(success)
105
106
107if __name__ == '__main__':
108 kp = KafkaPython()
109 kp.getConnect()
110 kp.beginConsumer()

文章转载自北漂悟道之路,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。






