暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

python脚本实现opensearch集群按照存储空间分配分片策略

原创 zayki 2023-11-09
597

Opensearch集群现状,为了节省空间,并且系统每天产生的索引大小也不大,所以配置索引模板的分片数量策略为1主0备(生产环境不建议采用这种配置),集群分片的分配策略配置如下:
cluster.routing.allocation.balance.index:0.55
cluster.routing.allocation.balance.shard:0.45
cluster.routing.allocation.balance.threshold:1.0
权重都是根据索引或者分片数量经过特定路由算法来实现分片分配,这样导致三个集群节点的存储空间不均衡,经常是某个节点的存储空间达到告警阀值。所以想到了预创建索引方案,并指定到自己想要分配的节点。实现的python脚本如下,路过的看官如果有更好的方案可以留言给我。

#!/usr/bin/evn python3 # -*-coding:utf-8-*- # Author: zyjsuper # Filename: create_indic_task.py # CreateTime: 2023/11/08 15:10 from opensearchpy import OpenSearch import time import optparse import warnings from email.mime.text import MIMEText from email.header import Header import smtplib from datetime import datetime,timedelta warnings.filterwarnings('ignore') class create_indic_task: def __init__(self, es_ip, username, password): self.es_ip = es_ip self.username = username self.password = password self.es = OpenSearch( self.es_ip, verify_certs = False, verify_hostname = False, ca_certs='root-ca.pem', http_auth=(self.username, self.password), scheme="https", port=9200, ) self.pattern_lst = [] self.date_lst = [] def send_mail(self,content): # 发送邮件 sender = 'monitor@test.com.cn' receiver = 'zyjsuper@test.com.cn' message = MIMEText(content, 'plain', 'utf-8') message['From'] = Header('monitor', 'utf-8') message['To'] = Header(receiver, 'utf-8') subject = 'OpenSearch预创建索引失败' message['Subject'] = Header(subject, 'utf-8') smtpObj = smtplib.SMTP('email.test.com.cn', port=25) try: smtpObj.sendmail(sender, receiver, message.as_string()) print("Send mail successfully.") except Exception as err: print(str(err)) exit def get_min_diskpercent_node(self): try: disk_usage = {} nodes = self.es.cat.allocation(h='dp,host').split('\n') for record in nodes: if record != '': lst = record.split(' ') disk_usage[lst[1]] = int(lst[0]) ip_addr = min(disk_usage, key=disk_usage.get) print("The minimal disk usage node is:{0}".format(ip_addr)) return ip_addr except Exception as ex: print(str(ex)) def create_indic(self,ip): # 获取当前日期和时间 now = datetime.now() # 计算明天的日期和时间 tomorrow = now + timedelta(days=1) tomorrow_date = datetime.strftime(tomorrow, '%Y-%m-%d').replace('-', '.') index_name = 'logstash-' + tomorrow_date try: self.es.indices.create(index_name) print("Create index {0} successuflly.".format(index_name)) except Exception as e: self.send_mail("错误原因为:{0}\nip为:{1}".format(str(e),ip)) print("Create index failed,the error is:{0}".format(str(e))) exit() current_node = self.es.cat.shards(index_name,h='node').split('\n')[0] try: for node in self.es.nodes.info()['nodes'].values(): if node['host'] == ip: # 替换为你要指定的节点的IP地址 print("Move index {0} from {1} to {2}".format(index_name,current_node,node['name'])) self.es.cluster.reroute( body={ 'commands': [ { 'move': { 'index': index_name, 'shard': 0, 'from_node': current_node, 'to_node': node['name'] } } ] } ) except Exception as e: print("Move index failed,the error is:{0}".format(str(e))) exit() if __name__ == '__main__': start_time = time.time() usage = "Usage: create_indic_task -s ip -u admin -p password" CommandParser = optparse.OptionParser(usage, version="create_indic_task 1.0 By Zyjsuper.", add_help_option=False) CommandParser.add_option("-s", "--elastic", help="The elasticsearch node's ip address.", action="store") CommandParser.add_option("-u", "--username", help="Login user with admin privliege..", action="store") CommandParser.add_option("-p", "--password", help="Login password for admin user.", action='store') (args, _) = CommandParser.parse_args() if args.elastic is not None and args.username is not None and args.password is not None: obj = create_indic_task(args.elastic,args.username,args.password) min_diskusage_node_ip = obj.get_min_diskpercent_node() obj.create_indic(min_diskusage_node_ip) else: CommandParser.print_help()
最后修改时间:2023-11-09 08:51:56
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论