一、创建webhook脚本
# -*- coding: utf-8 -*-
"""
====================================
@File Name :email_webhook.py
@Time : 2023/10/27 14:55
@Program IDE :PyCharm
@Create by Author :zayki
====================================
"""
import smtplib
import time
from os import popen
from email.mime.text import MIMEText
from email.header import Header
from flask import Flask, request
app = Flask(__name__)
def send_mail(content):
# content=request.data
# print(content)
# 发送邮件
sender = 'sysmonitor@test.com.cn'
receiver = 'zhaoyuanji@test.com.cn'
message = MIMEText(content,'plain', 'utf-8')
message['From'] = Header('sysmonitor','utf-8')
message['To'] = Header(receiver, 'utf-8')
subject = 'Elasticsearch测试集群监控'
message['Subject'] = Header(subject, 'utf-8')
smtpObj = smtplib.SMTP('email.test.com.cn',port=25)
try:
smtpObj.sendmail(sender, receiver, message.as_string())
print("Send mail successfully.")
except Exception as err:
print(str(err))
@app.route("/send_mail", methods=["POST"])
def send():
monopt=request.args["monopt"]
client=request.remote_addr
if monopt=="tookvalue":
content = "近一分钟内命中率低于500个,请检查集群状态。集群IP地址为:{0}".format(client)
popen("systemctl restart logstash")
logstash_status=popen("systemctl status logstash").read().strip()
print(logstash_status)
if monopt=="taskcount":
content = "近一分钟内执行的task数量低于5个,请检查集群状态。集群IP地址为:{0}".format(client)
send_mail(content)
return "Successfully."
if __name__ == "__main__":
app.run("0.0.0.0", "5000")
二、配置告警通知方式为webhook
如图所示,按照步骤点击。

来到通知目标地址的配置:

三、配置监控器,关联通知
如图所示,按照步骤点击:

进入到Monitors监控器配置页面:

四、创建触发器Trigger
上一步点击“Create”按钮之后,会来到如下页面,配置如图所示:

附:如何进入到trigger配置页面:

五、简单方法实现类似功能
- 使用python创建如下脚本:
#!/usr/bin/evn python3
# -*-coding:utf-8-*-
# Author: zyjsuper
# Filename: mon_total_hits_alert.py
# CreateTime: 2023/11/1 10:42
import os
import random
import requests
import json
import smtplib
from email.mime.text import MIMEText
from email.header import Header
import base64 as b64
from warnings import filterwarnings
filterwarnings('ignore')
class monitor_total_hits_alert:
def __init__(self,username,password):
auth=b64.b64encode(('{0}:{1}'.format(username,password)).encode('utf-8')).decode('utf-8')
es_cluster = [ '20.1.5.114','20.1.5.146','20.1.5.197' ]
self.url = 'https://{0}:9200/*/_search?size=0'.format(es_cluster[random.randint(0,2)])
self.json_request = '''{
"track_total_hits": true,
"query": {
"range": {
"@timestamp": {
"from": "now-1m",
"to": "now",
"format": "yyyy-MM-dd HH:mm:ss.SSS",
"include_lower": true,
"include_upper": true,
"boost": 1
}
}
}
}'''
self.headers = {
'Content-Type': 'application/json',
'Authorization': 'Basic {0}'.format(auth)
}
def get_total_hits(self):
response = requests.post(self.url, headers = self.headers , data = self.json_request,verify=False)
print("本次请求得到的响应数据为:{0}".format(response.text))
return json.loads(response.text)
def send_mail(self,subject, content):
sender = 'sysmonitor@test.com.cn'
receiver = 'zyjsuper@test.com.cn'
message = MIMEText(content, 'plain', 'utf-8')
message['From'] = Header('sysmonitor', 'utf-8')
message['To'] = Header(receiver, 'utf-8')
# subject = 'OpenSearch集群监控'
message['Subject'] = Header(subject, 'utf-8')
smtpObj = smtplib.SMTP('email.test.com.cn', port=25)
try:
smtpObj.sendmail(sender, receiver, message.as_string())
print("Send mail successfully.")
except Exception as err:
print(str(err))
def restart_logstash(self):
os.popen('systemctl restart logstash')
if __name__ == '__main__':
obj = monitor_total_hits_alert("admin","admin")
result = obj.get_total_hits()
total_hits = result['hits']['total']['value']
print("本次查询的命中次数为:{0}".format(total_hits))
if total_hits < 500:
obj.send_mail("ElasticSearch集群监控","近一分钟内命中率低于500个,请检查集群状态。")
else:
pass
- 定义crontab每隔十五分钟执行:
[root@zht-logstash-pro-app-1 ~]# crontab -l
*/15 * * * * /usr/bin/python3 /root/scripts/mon_total_hits_alert.py
六、使用fastapi完成的webhook脚本
#!/usr/bin/env python3
# -*-coding:utf-8-*-
# Author: zyjsuper
# Filename: webhook.py
# CreateTime: 2024/7/4 14:37
import uvicorn
import smtplib
from os import popen
from email.mime.text import MIMEText
from email.header import Header
from fastapi import FastAPI,Request
import fastapi_cdn_host
app = FastAPI()
def send_mail(content):
# 发送邮件
sender = 'sysmonitor@test.com.cn'
receiver = 'zhaoyuanji@test.com.cn'
message = MIMEText(content,'plain', 'utf-8')
message['From'] = Header('sysmonitor','utf-8')
message['To'] = Header(receiver, 'utf-8')
subject = 'OpenSearch测试集群监控'
message['Subject'] = Header(subject, 'utf-8')
smtpObj = smtplib.SMTP('email.test.com.cn',port=25)
try:
smtpObj.sendmail(sender, receiver, message.as_string())
print("Send mail successfully.")
except Exception as err:
print(str(err))
@app.post("/send_mail/{item_id}")
def send(item_id: int, request: Request):
item_id = item_id # 1代表生产通知,2代表测试通知
client_host = request.client.host
if item_id == 1: # 访问http://192.168.128.2:9000/send_mail/1触发
content = "近一分钟内命中率低于500个,请检查集群状态。来源IP地址为:{0}".format(client_host)
popen("systemctl restart logstash")
logstash_status = popen("systemctl status logstash").read().strip()
print(logstash_status)
send_mail(content)
elif item_id == 2: # 访问http://192.168.128.2:9000/send_mail/2触发
content = "近一分钟内执行的task数量低于50个,请检查集群状态。来源IP地址为:{0}".format(client_host)
popen("systemctl restart logstash")
logstash_status = popen("systemctl status logstash").read().strip()
print(logstash_status)
send_mail(content)
else:
pass
return {"client_host": client_host }
if __name__ == '__main__':
fastapi_cdn_host.patch_docs(app)
uvicorn.run(app, host="192.168.128.2", port=9000)
最后修改时间:2024-07-05 08:49:53
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




