Opensearch集群现状,为了节省空间,并且系统每天产生的索引大小也不大,所以配置索引模板的分片数量策略为1主0备(生产环境不建议采用这种配置),集群分片的分配策略配置如下:
cluster.routing.allocation.balance.index:0.55
cluster.routing.allocation.balance.shard:0.45
cluster.routing.allocation.balance.threshold:1.0
权重都是根据索引或者分片数量经过特定路由算法来实现分片分配,这样导致三个集群节点的存储空间不均衡,经常是某个节点的存储空间达到告警阀值。所以想到了预创建索引方案,并指定到自己想要分配的节点。实现的python脚本如下,路过的看官如果有更好的方案可以留言给我。
#!/usr/bin/evn python3
# -*-coding:utf-8-*-
# Author: zyjsuper
# Filename: create_indic_task.py
# CreateTime: 2023/11/08 15:10
from opensearchpy import OpenSearch
import time
import optparse
import warnings
from email.mime.text import MIMEText
from email.header import Header
import smtplib
from datetime import datetime,timedelta
warnings.filterwarnings('ignore')
class create_indic_task:
def __init__(self, es_ip, username, password):
self.es_ip = es_ip
self.username = username
self.password = password
self.es = OpenSearch(
self.es_ip,
verify_certs = False,
verify_hostname = False,
ca_certs='root-ca.pem',
http_auth=(self.username, self.password),
scheme="https",
port=9200,
)
self.pattern_lst = []
self.date_lst = []
def send_mail(self,content):
# 发送邮件
sender = 'monitor@test.com.cn'
receiver = 'zyjsuper@test.com.cn'
message = MIMEText(content, 'plain', 'utf-8')
message['From'] = Header('monitor', 'utf-8')
message['To'] = Header(receiver, 'utf-8')
subject = 'OpenSearch预创建索引失败'
message['Subject'] = Header(subject, 'utf-8')
smtpObj = smtplib.SMTP('email.test.com.cn', port=25)
try:
smtpObj.sendmail(sender, receiver, message.as_string())
print("Send mail successfully.")
except Exception as err:
print(str(err))
exit
def get_min_diskpercent_node(self):
try:
disk_usage = {}
nodes = self.es.cat.allocation(h='dp,host').split('\n')
for record in nodes:
if record != '':
lst = record.split(' ')
disk_usage[lst[1]] = int(lst[0])
ip_addr = min(disk_usage, key=disk_usage.get)
print("The minimal disk usage node is:{0}".format(ip_addr))
return ip_addr
except Exception as ex:
print(str(ex))
def create_indic(self,ip):
# 获取当前日期和时间
now = datetime.now()
# 计算明天的日期和时间
tomorrow = now + timedelta(days=1)
tomorrow_date = datetime.strftime(tomorrow, '%Y-%m-%d').replace('-', '.')
index_name = 'logstash-' + tomorrow_date
try:
self.es.indices.create(index_name)
print("Create index {0} successuflly.".format(index_name))
except Exception as e:
self.send_mail("错误原因为:{0}\nip为:{1}".format(str(e),ip))
print("Create index failed,the error is:{0}".format(str(e)))
exit()
current_node = self.es.cat.shards(index_name,h='node').split('\n')[0]
try:
for node in self.es.nodes.info()['nodes'].values():
if node['host'] == ip: # 替换为你要指定的节点的IP地址
print("Move index {0} from {1} to {2}".format(index_name,current_node,node['name']))
self.es.cluster.reroute(
body={
'commands': [
{
'move': {
'index': index_name,
'shard': 0,
'from_node': current_node,
'to_node': node['name']
}
}
]
}
)
except Exception as e:
print("Move index failed,the error is:{0}".format(str(e)))
exit()
if __name__ == '__main__':
start_time = time.time()
usage = "Usage: create_indic_task -s ip -u admin -p password"
CommandParser = optparse.OptionParser(usage, version="create_indic_task 1.0 By Zyjsuper.",
add_help_option=False)
CommandParser.add_option("-s", "--elastic", help="The elasticsearch node's ip address.", action="store")
CommandParser.add_option("-u", "--username", help="Login user with admin privliege..", action="store")
CommandParser.add_option("-p", "--password", help="Login password for admin user.", action='store')
(args, _) = CommandParser.parse_args()
if args.elastic is not None and args.username is not None and args.password is not None:
obj = create_indic_task(args.elastic,args.username,args.password)
min_diskusage_node_ip = obj.get_min_diskpercent_node()
obj.create_indic(min_diskusage_node_ip)
else:
CommandParser.print_help()
最后修改时间:2023-11-09 08:51:56
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




