对于一个包含 49,000,000 文档大索引 idx1,在重建索引时需要特别注意性能和资源的优化。以下是一些优化策略和建议,帮助你高效地完成这一任务。优化策略
1. 分批次执行 _reindex
通过设置 size 参数来控制每次请求处理的文档数量,可以避免一次性加载过多数据导致内存耗尽或影响集群性能。
分批次执行 _reindex
POST /_reindex { "source": { "index": "idx1", "size": 10000 // 每次处理 10,000 条记录 }, "dest": { "index": "idx2" } }
2. 使用滚动查询(Scroll API)和批量导入(Bulk API)
如果 _reindex 操作对你的集群造成了较大的压力,可以考虑手动使用 scroll 和 bulk API 进行分批次导出和导入。
2.1 导出数据并分批写入文件
import requests
import json
import os
# Elasticsearch 配置
es_host = "http://localhost:9200"
index_name = "idx1"
scroll_timeout = "2m" # 设置滚动查询的超时时间
batch_size = 10000 # 每次请求获取的文档数量
output_dir = "exported_data"
# 创建输出目录
os.makedirs(output_dir, exist_ok=True)
# 初始化滚动查询
url = f"{es_host}/{index_name}/_search?scroll={scroll_timeout}&size={batch_size}"
payload = {
"query": {
"match_all": {}
}
}
response = requests.post(url, headers={"Content-Type": "application/json"}, data=json.dumps(payload))
scroll_id = response.json()["_scroll_id"]
hits = response.json()["hits"]["hits"]
# 文件计数器
file_counter = 0
# 写入第一个文件
with open(os.path.join(output_dir, f"part_{file_counter}.json"), "w") as f:
for doc in hits:
f.write(json.dumps(doc) + "\n")
# 继续滚动查询直到没有更多文档
while len(hits) > 0:
url = f"{es_host}/_search/scroll"
payload = {
"scroll": scroll_timeout,
"scroll_id": scroll_id
}
response = requests.post(url, headers={"Content-Type": "application/json"}, data=json.dumps(payload))
hits = response.json()["hits"]["hits"]
if len(hits) > 0:
file_counter += 1
with open(os.path.join(output_dir, f"part_{file_counter}.json"), "w") as f:
for doc in hits:
f.write(json.dumps(doc) + "\n")
2.2 批量导入数据
import requests
import json
import os
# Elasticsearch 配置
es_host = "http://localhost:9200"
target_index = "idx2"
input_dir = "exported_data"
batch_size = 10000 # 每次批量导入的文档数量
# 获取所有导出文件
files = [f for f in os.listdir(input_dir) if f.endswith('.json')]
def process_file(file_path):
with open(file_path, "r") as f:
lines = f.readlines()
bulk_data = []
for i in range(0, len(lines), batch_size):
batch_lines = lines[i:i + batch_size]
bulk_data = []
for line in batch_lines:
doc = json.loads(line)
bulk_data.append(json.dumps({"index": {"_index": target_index, "_id": doc["_id"]}}))
bulk_data.append(json.dumps(doc["_source"]))
bulk_payload = "\n".join(bulk_data) + "\n"
url = f"{es_host}/_bulk"
response = requests.post(url, headers={"Content-Type": "application/json"}, data=bulk_payload)
if response.status_code != 200:
print(f"Error importing {file_path}: {response.text}")
# 使用多线程并行处理文件
from concurrent.futures import ThreadPoolExecutor, as_completed
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(process_file, os.path.join(input_dir, file)) for file in files]
for future in as_completed(futures):
try:
future.result()
except Exception as e:
print(f"Exception occurred: {e}")
3. 调整 _reindex 的参数
增加 requests_per_second 参数通过限制每秒请求数来降低对集群的压力:
POST /_reindex
{
"source": {
"index": "idx1"
},
"dest": {
"index": "idx2"
},
"slices": 5, // 并行切片数
"requests_per_second": 100 // 每秒处理的请求数
}
使用 wait_for_completion 参数
如果你不希望等待 _reindex 操作完成,可以设置 wait_for_completion 为 false,并在后台异步执行操作:
POST /_reindex?wait_for_completion=false
{
"source": {
"index": "idx1"
},
"dest": {
"index": "idx2"
}
}
然后可以通过任务 ID 监控进度:
GET /_tasks/task_id
4. 监控集群健康状态
在执行大规模操作期间,定期检查集群健康状态,确保操作不会对生产环境造成过大压力:
GET /_cluster/health
5. 增加副本数量
在新索引创建完成后,可以根据需要增加副本数量以提高可用性和搜索性能:PUT /idx2/_settings
{
"number_of_replicas": 1
}
预估操作时长
假设每次请求处理 10,000 条记录,总共有 49,000,000 条记录:•需要进行约 4,900 次请求(49,000,000 / 10,000)。•如果每次请求耗时 1 秒,则总共需要 4,900 秒,即约 1.36 小时。实际操作时,由于优化措施的引入,整体效率可能会有所提升。
完整示例脚本
以下是完整的示例脚本,展示了如何使用滚动查询和批量导入来优化重建索引的过程:
导出数据
import requests
import json
import os
# Elasticsearch 配置
es_host = "http://localhost:9200"
index_name = "idx1"
scroll_timeout = "2m" # 设置滚动查询的超时时间
batch_size = 10000 # 每次请求获取的文档数量
output_dir = "exported_data"
# 创建输出目录
os.makedirs(output_dir, exist_ok=True)
# 初始化滚动查询
url = f"{es_host}/{index_name}/_search?scroll={scroll_timeout}&size={batch_size}"
payload = {
"query": {
"match_all": {}
}
}
response = requests.post(url, headers={"Content-Type": "application/json"}, data=json.dumps(payload))
scroll_id = response.json()["_scroll_id"]
hits = response.json()["hits"]["hits"]
# 文件计数器
file_counter = 0
# 写入第一个文件
with open(os.path.join(output_dir, f"part_{file_counter}.json"), "w") as f:
for doc in hits:
f.write(json.dumps(doc) + "\n")
# 继续滚动查询直到没有更多文档
while len(hits) > 0:
url = f"{es_host}/_search/scroll"
payload = {
"scroll": scroll_timeout,
"scroll_id": scroll_id
}
response = requests.post(url, headers={"Content-Type": "application/json"}, data=json.dumps(payload))
hits = response.json()["hits"]["hits"]
if len(hits) > 0:
file_counter += 1
with open(os.path.join(output_dir, f"part_{file_counter}.json"), "w") as f:
for doc in hits:
f.write(json.dumps(doc) + "\n")
导入数据
import requests
import json
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
# Elasticsearch 配置
es_host = "http://localhost:9200"
target_index = "idx2"
input_dir = "exported_data"
batch_size = 1000
# 获取所有导出文件
files = [f for f in os.listdir(input_dir) if f.endswith('.json')]
def process_file(file_path):
with open(file_path, "r") as f:
lines = f.readlines()
bulk_data = []
for i in range(0, len(lines), batch_size):
batch_lines = lines[i:i + batch_size]
bulk_data = []
for line in batch_lines:
doc = json.loads(line)
bulk_data.append(json.dumps({"index": {"_index": target_index, "_id": doc["_id"]}}))
bulk_data.append(json.dumps(doc["_source"]))
bulk_payload = "\n".join(bulk_data) + "\n"
url = f"{es_host}/_bulk"
response = requests.post(url, headers={"Content-Type": "application/json"}, data=bulk_payload)
if response.status_code != 200:
print(f"Error importing {file_path}: {response.text}")
# 使用线程池并行处理文件
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(process_file, os.path.join(input_dir, file)) for file in files]
for future in as_completed(futures):
try:
future.result()
except Exception as e:
print(f"Exception occurred: {e}")
将临时索引重命名为 idx1
在 Elasticsearch 中,直接重命名索引是不可能的。我们可以使用别名来实现这一点,或者通过删除和重新创建索引来完成。
方法一:使用别名
你可以为 idx1_temp 设置一个别名为 idx1,这样应用程序仍然可以通过 idx1 访问数据。
POST /_aliases
{
"actions": [
{ "add": { "index": "idx1_temp", "alias": "idx1" } }
]
}
如果已经有别名指向 idx1,你需要先移除旧的别名:
POST /_aliases
{
"actions": [
{ "remove": { "index": "idx1", "alias": "idx1" } },
{ "add": { "index": "idx1_temp", "alias": "idx1" } }
]
}




