暂无图片
暂无图片
3
暂无图片
暂无图片
暂无图片

ElasticSearch 4个亿文档大索引重建(ES7.10.0)

原创 virvle 2025-02-19
376

对于一个包含 49,000,000 文档大索引 idx1,在重建索引时需要特别注意性能和资源的优化。以下是一些优化策略和建议,帮助你高效地完成这一任务。优化策略

1. 分批次执行 _reindex

通过设置 size 参数来控制每次请求处理的文档数量,可以避免一次性加载过多数据导致内存耗尽或影响集群性能。

分批次执行 _reindex

POST /_reindex { "source": { "index": "idx1", "size": 10000 // 每次处理 10,000 条记录 }, "dest": { "index": "idx2" } }

2. 使用滚动查询(Scroll API)和批量导入(Bulk API)

如果 _reindex 操作对你的集群造成了较大的压力,可以考虑手动使用 scroll 和 bulk API 进行分批次导出和导入。

2.1 导出数据并分批写入文件

import requests import json import os # Elasticsearch 配置 es_host = "http://localhost:9200" index_name = "idx1" scroll_timeout = "2m" # 设置滚动查询的超时时间 batch_size = 10000 # 每次请求获取的文档数量 output_dir = "exported_data" # 创建输出目录 os.makedirs(output_dir, exist_ok=True) # 初始化滚动查询 url = f"{es_host}/{index_name}/_search?scroll={scroll_timeout}&size={batch_size}" payload = { "query": { "match_all": {} } } response = requests.post(url, headers={"Content-Type": "application/json"}, data=json.dumps(payload)) scroll_id = response.json()["_scroll_id"] hits = response.json()["hits"]["hits"] # 文件计数器 file_counter = 0 # 写入第一个文件 with open(os.path.join(output_dir, f"part_{file_counter}.json"), "w") as f: for doc in hits: f.write(json.dumps(doc) + "\n") # 继续滚动查询直到没有更多文档 while len(hits) > 0: url = f"{es_host}/_search/scroll" payload = { "scroll": scroll_timeout, "scroll_id": scroll_id } response = requests.post(url, headers={"Content-Type": "application/json"}, data=json.dumps(payload)) hits = response.json()["hits"]["hits"] if len(hits) > 0: file_counter += 1 with open(os.path.join(output_dir, f"part_{file_counter}.json"), "w") as f: for doc in hits: f.write(json.dumps(doc) + "\n")

2.2 批量导入数据

import requests import json import os # Elasticsearch 配置 es_host = "http://localhost:9200" target_index = "idx2" input_dir = "exported_data" batch_size = 10000 # 每次批量导入的文档数量 # 获取所有导出文件 files = [f for f in os.listdir(input_dir) if f.endswith('.json')] def process_file(file_path): with open(file_path, "r") as f: lines = f.readlines() bulk_data = [] for i in range(0, len(lines), batch_size): batch_lines = lines[i:i + batch_size] bulk_data = [] for line in batch_lines: doc = json.loads(line) bulk_data.append(json.dumps({"index": {"_index": target_index, "_id": doc["_id"]}})) bulk_data.append(json.dumps(doc["_source"])) bulk_payload = "\n".join(bulk_data) + "\n" url = f"{es_host}/_bulk" response = requests.post(url, headers={"Content-Type": "application/json"}, data=bulk_payload) if response.status_code != 200: print(f"Error importing {file_path}: {response.text}") # 使用多线程并行处理文件 from concurrent.futures import ThreadPoolExecutor, as_completed with ThreadPoolExecutor(max_workers=5) as executor: futures = [executor.submit(process_file, os.path.join(input_dir, file)) for file in files] for future in as_completed(futures): try: future.result() except Exception as e: print(f"Exception occurred: {e}")

3. 调整 _reindex 的参数

增加 requests_per_second 参数通过限制每秒请求数来降低对集群的压力:

POST /_reindex { "source": { "index": "idx1" }, "dest": { "index": "idx2" }, "slices": 5, // 并行切片数 "requests_per_second": 100 // 每秒处理的请求数 }

使用 wait_for_completion 参数

如果你不希望等待 _reindex 操作完成,可以设置 wait_for_completion 为 false,并在后台异步执行操作:

POST /_reindex?wait_for_completion=false { "source": { "index": "idx1" }, "dest": { "index": "idx2" } }

然后可以通过任务 ID 监控进度:

GET /_tasks/task_id

4. 监控集群健康状态

在执行大规模操作期间,定期检查集群健康状态,确保操作不会对生产环境造成过大压力:

GET /_cluster/health

5. 增加副本数量

在新索引创建完成后,可以根据需要增加副本数量以提高可用性和搜索性能:PUT /idx2/_settings

{ "number_of_replicas": 1 }

预估操作时长

假设每次请求处理 10,000 条记录,总共有 49,000,000 条记录:•需要进行约 4,900 次请求(49,000,000 / 10,000)。•如果每次请求耗时 1 秒,则总共需要 4,900 秒,即约 1.36 小时。实际操作时,由于优化措施的引入,整体效率可能会有所提升。

完整示例脚本

以下是完整的示例脚本,展示了如何使用滚动查询和批量导入来优化重建索引的过程:

导出数据

import requests import json import os # Elasticsearch 配置 es_host = "http://localhost:9200" index_name = "idx1" scroll_timeout = "2m" # 设置滚动查询的超时时间 batch_size = 10000 # 每次请求获取的文档数量 output_dir = "exported_data" # 创建输出目录 os.makedirs(output_dir, exist_ok=True) # 初始化滚动查询 url = f"{es_host}/{index_name}/_search?scroll={scroll_timeout}&size={batch_size}" payload = { "query": { "match_all": {} } } response = requests.post(url, headers={"Content-Type": "application/json"}, data=json.dumps(payload)) scroll_id = response.json()["_scroll_id"] hits = response.json()["hits"]["hits"] # 文件计数器 file_counter = 0 # 写入第一个文件 with open(os.path.join(output_dir, f"part_{file_counter}.json"), "w") as f: for doc in hits: f.write(json.dumps(doc) + "\n") # 继续滚动查询直到没有更多文档 while len(hits) > 0: url = f"{es_host}/_search/scroll" payload = { "scroll": scroll_timeout, "scroll_id": scroll_id } response = requests.post(url, headers={"Content-Type": "application/json"}, data=json.dumps(payload)) hits = response.json()["hits"]["hits"] if len(hits) > 0: file_counter += 1 with open(os.path.join(output_dir, f"part_{file_counter}.json"), "w") as f: for doc in hits: f.write(json.dumps(doc) + "\n")

导入数据

import requests import json import os from concurrent.futures import ThreadPoolExecutor, as_completed # Elasticsearch 配置 es_host = "http://localhost:9200" target_index = "idx2" input_dir = "exported_data" batch_size = 1000 # 获取所有导出文件 files = [f for f in os.listdir(input_dir) if f.endswith('.json')] def process_file(file_path): with open(file_path, "r") as f: lines = f.readlines() bulk_data = [] for i in range(0, len(lines), batch_size): batch_lines = lines[i:i + batch_size] bulk_data = [] for line in batch_lines: doc = json.loads(line) bulk_data.append(json.dumps({"index": {"_index": target_index, "_id": doc["_id"]}})) bulk_data.append(json.dumps(doc["_source"])) bulk_payload = "\n".join(bulk_data) + "\n" url = f"{es_host}/_bulk" response = requests.post(url, headers={"Content-Type": "application/json"}, data=bulk_payload) if response.status_code != 200: print(f"Error importing {file_path}: {response.text}") # 使用线程池并行处理文件 with ThreadPoolExecutor(max_workers=5) as executor: futures = [executor.submit(process_file, os.path.join(input_dir, file)) for file in files] for future in as_completed(futures): try: future.result() except Exception as e: print(f"Exception occurred: {e}")

将临时索引重命名为 idx1

在 Elasticsearch 中,直接重命名索引是不可能的。我们可以使用别名来实现这一点,或者通过删除和重新创建索引来完成。
方法一:使用别名
你可以为 idx1_temp 设置一个别名为 idx1,这样应用程序仍然可以通过 idx1 访问数据。

POST /_aliases { "actions": [ { "add": { "index": "idx1_temp", "alias": "idx1" } } ] }

如果已经有别名指向 idx1,你需要先移除旧的别名:

POST /_aliases { "actions": [ { "remove": { "index": "idx1", "alias": "idx1" } }, { "add": { "index": "idx1_temp", "alias": "idx1" } } ] }
最后修改时间:2025-02-20 10:37:43
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论