关注下方公众号,获取更多热点资讯
ElasticSearch向量检索实战:从零构建大模型时代的高效检索系统
大模型时代的向量检索需求
随着人工智能技术的快速发展,大型语言模型(LLM)如GPT、Llama、Deepseek等已经成为各类智能应用的基础设施。这些模型虽然拥有强大的能力,但也面临着知识时效性、定制化内容、隐私保护等多方面的挑战。为了解决这些问题,"检索增强生成"(Retrieval-Augmented Generation,RAG)技术应运而生,成为大模型应用的重要范式。
在RAG架构中,向量数据库扮演着至关重要的角色。它存储了文档、知识或数据的向量表示,能够高效地检索与用户查询语义相关的信息。ElasticSearch作为一种成熟的搜索引擎,近年来也增强了对向量检索的支持,成为向量数据库的有力竞争者。
本文将深入探讨如何利用ElasticSearch构建一个完整的向量检索系统,详细分析每一行代码的实现原理、功能和优化策略,帮助读者全面理解大模型场景下的向量检索技术。
先看运行效果

向量检索基础理论
文本向量化原理
向量检索的核心是将文本转换为高维向量空间中的点,这一过程称为"向量化"或"嵌入",常说的 Embedding 模型就是干这个用的。现代嵌入模型(如BERT、USE等)能够捕捉文本的语义信息,使得语义相似的文本在向量空间中距离较近。
class OllamaEmbedding:
def __init__(self, model_name="llama2"):
self.model_name = model_name
self.api_url = "http://192.168.1.125:11434/api/embeddings"
logger.info(f"初始化 Ollama 嵌入模型: {model_name}, API: {self.api_url}")
我们采用Ollama提供的嵌入服务,可以灵活切换不同的大模型来生成文本的向量表示。
向量相似度度量
向量检索中常用的相似度度量包括:
1. 余弦相似度:计算两个向量夹角的余弦值,范围为[-1,1],越接近1表示越相似。 2. 欧几里得距离:计算向量间的直线距离,值越小表示越相似。 3. 点积:两个向量的内积,通常用于标准化向量。
本项目中,我们主要使用余弦相似度:
"script": {
"source": "cosineSimilarity(params.query_vector, 'content_vector') + 1.0",
"params": {"query_vector": query_vector}
}
注意这里统一将原始余弦相似度值加1,将范围调整为[0,2],使得结果更直观且易于与其他评分因素结合。
向量索引结构
为了加速向量检索,通常需要特殊的索引结构:
1. 精确算法:暴力计算所有向量的相似度,准确但效率低。 2. 近似算法:如HNSW(Hierarchical Navigable Small World)、IVF(Inverted File Index)等,牺牲部分精度换取速度。
ElasticSearch 7.x后开始支持向量索引,8.x版本更是提供了原生的KNN查询支持:
"content_vector": {
"type": "dense_vector",
"dims": vector_dim,
"index": True,
"similarity": "cosine"
}
ElasticSearch作为向量数据库的优势
全文检索与向量检索的结合
ElasticSearch的最大优势之一是能同时支持传统的全文检索和向量检索,实现混合查询:
def hybrid_search(query_text, category=None, top_k=3):
# ... 代码省略
if category:
query = {
"bool": {
"must": [knn_query],
"filter": [{"term": {"category": category}}]
}
}
else:
query = knn_query
这种混合查询能够结合语义相似性和精确匹配,提高检索质量。
分布式与高可用性
ElasticSearch天生是为分布式环境设计的,具有:
• 水平扩展能力 • 分片与副本机制 • 自动故障转移 • 高吞吐量
这些特性使其能够应对大规模向量数据的存储和检索需求。
丰富的生态系统
除了基本的向量检索功能,ElasticSearch还提供:
• Kibana可视化 • Beats数据采集 • Logstash数据处理 • X-Pack安全功能
这使得构建完整的生产级向量检索系统变得更加便捷。
本文系统架构与组件
本系统的架构由以下主要组件构成:
1. ElasticSearch服务:核心存储与检索引擎 2. Ollama嵌入服务:负责文本向量化 3. Python应用层:连接上述服务并提供业务功能
数据流向如下:
原始文本 → Ollama嵌入服务 → 向量表示 → ElasticSearch存储
查询文本 → Ollama嵌入服务 → 查询向量 → ElasticSearch检索 → 返回结果
代码实现详解
引入依赖
pip install elasticsearch sentence-transformers
pip install requests
环境准备与初始化
首先,我们导入必要的库并配置日志系统:
import numpy as np
from elasticsearch import Elasticsearch
import time
import requests
import logging
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler("vector_search.log")
]
)
logger = logging.getLogger("vector_search")
这里使用了Python的标准日志库,配置了双重输出(控制台和文件),确保我们能全面监控系统运行状态。
接着,连接ElasticSearch服务:
# 连接到 Elasticsearch
logger.info("正在连接到 Elasticsearch...")
es = Elasticsearch("http://192.168.1.1:9200")
logger.info(f"Elasticsearch 连接状态: {es.ping()}")
这里我们连接到特定IP地址的ElasticSearch实例。在生产环境中,可能需要配置更复杂的连接参数,比如认证信息、SSL设置等。
向量化模型实现
为了将文本转换为向量,我们实现了一个OllamaEmbedding
类:
class OllamaEmbedding:
def__init__(self, model_name="llama2"):
self.model_name = model_name
self.api_url = "http://192.168.1.125:11434/api/embeddings"
logger.info(f"初始化 Ollama 嵌入模型: {model_name}, API: {self.api_url}")
defencode(self, text, normalize=True):
"""将文本转换为向量表示"""
ifisinstance(text, list):
# 批量处理文本列表
logger.info(f"批量处理 {len(text)} 条文本...")
return [self.encode(t) for t in text]
logger.debug(f"对文本进行向量化: {text[:30]}...")
payload = {
"model": self.model_name,
"prompt": text
}
try:
logger.debug(f"调用 Ollama API...")
response = requests.post(self.api_url, json=payload)
if response.status_code == 200:
embedding = response.json()["embedding"]
logger.debug(f"获取向量成功,维度: {len(embedding)}")
return embedding
else:
error_msg = f"Ollama API调用失败: {response.text}"
logger.error(error_msg)
raise Exception(error_msg)
except Exception as e:
logger.error(f"Ollama 向量化发生异常: {str(e)}")
raise
这个类封装了Ollama嵌入API的调用,提供了文本向量化的功能。值得注意的是:
1. 模型可配置:通过构造函数参数选择不同的嵌入模型。 2. 批量处理:支持单条文本和文本列表的向量化。 3. 错误处理:全面的异常捕获和日志记录。 4. 调试信息:记录详细的调试信息,便于排查问题。
我们初始化了一个使用"bge-m3:latest"模型的嵌入实例:
logger.info("初始化嵌入模型...")
model = OllamaEmbedding("bge-m3:latest") # 或其他支持中文的模型
logger.info("嵌入模型初始化完成")
BGE-M3是BAAI(北京智源人工智能研究院)开发的多语言嵌入模型,对中文有很好的支持。
索引创建与管理
ElasticSearch索引是存储和检索向量数据的基础。我们实现了一个动态创建索引的函数:
def create_index():
# 测试一下向量维度
try:
test_text = "维度测试文本"
test_vector = model.encode(test_text)
vector_dim = len(test_vector)
logger.info(f"检测到向量维度为: {vector_dim}")
except Exception as e:
logger.error(f"测试向量维度失败: {str(e)}")
vector_dim = 1024# 默认维度,根据您的模型调整
logger.info(f"使用默认向量维度: {vector_dim}")
# 删除已存在的索引(如果存在)
if es.indices.exists(index=index_name):
logger.info(f"删除已存在的索引: {index_name}")
es.indices.delete(index=index_name)
time.sleep(3)
# 创建索引,配置向量搜索
logger.info(f"正在创建新索引: {index_name}")
index_settings = {
"mappings": {
"properties": {
"content": {
"type": "text",
"analyzer": "standard"
},
"content_vector": {
"type": "dense_vector",
"dims": vector_dim, # 使用实际检测到的维度
"index": True,
"similarity": "cosine"
},
"category": {
"type": "keyword"
}
}
}
}
try:
es.indices.create(index=index_name, body=index_settings)
logger.info(f"索引 '{index_name}' 创建成功,向量维度: {vector_dim}")
except Exception as e:
logger.error(f"创建索引失败: {str(e)}")
raise
1. 动态维度检测:自动测试嵌入模型的向量维度,确保索引配置与实际向量匹配。 2. 索引重置:检查并删除同名索引,确保每次运行都从干净状态开始。 3. 字段映射: • content
:文本内容,使用标准分析器• content_vector
:向量表示,配置为dense_vector类型• category
:分类标签,用于过滤
"dense_vector"类型是ElasticSearch专为向量数据设计的字段类型,"index: true"表示对该字段建立索引以加速检索,"similarity: cosine"指定使用余弦相似度计算向量间的距离。
样本数据生成
为了演示系统功能,我们生成了一组中文示例数据:
def generate_sample_data():
logger.info("生成示例数据...")
documents = [
{"content": "人工智能正在改变我们的生活方式", "category": "科技"},
{"content": "深度学习在图像识别领域取得了突破性进展", "category": "科技"},
{"content": "中国的经济发展进入了新阶段", "category": "经济"},
{"content": "全球股市今日大幅波动", "category": "经济"},
{"content": "科学家发现了新的星系", "category": "科学"},
{"content": "量子计算将带来计算能力的革命", "category": "科技"},
{"content": "环境保护对可持续发展至关重要", "category": "环保"},
{"content": "新冠疫情对全球经济产生了深远影响", "category": "经济"},
{"content": "机器学习算法能够从数据中学习模式", "category": "科技"},
{"content": "太阳系中可能存在更多类地行星", "category": "科学"},
{"content": "自然语言处理技术使机器能够理解人类语言", "category": "科技"},
{"content": "减少碳排放是应对气候变化的关键", "category": "环保"},
{"content": "区块链技术为金融系统带来创新", "category": "科技"},
{"content": "生物多样性对生态系统健康至关重要", "category": "环保"},
{"content": "宇宙大爆炸理论解释了宇宙的起源", "category": "科学"}
]
logger.info(f"生成了 {len(documents)} 条示例数据")
return documents
这组样本数据涵盖了科技、经济、科学和环保四个类别,内容简洁明了,便于验证检索效果。在实际应用中,可以从外部文件或数据库加载更大规模的真实数据集。
文档索引实现
将文档写入ElasticSearch的过程是向量检索系统的重要一环:
def index_documents(documents):
logger.info(f"开始写入 {len(documents)} 条文档到ES...")
success_count = 0
# 首先测试向量维度
test_vector = model.encode(documents[0]["content"])
logger.info(f"向量维度测试: {len(test_vector)}")
for i, doc inenumerate(documents):
try:
# 生成文本的向量表示
logger.debug(f"向量化文档 {i+1}/{len(documents)}: {doc['content'][:30]}...")
content_vector = model.encode(doc["content"])
# 确保向量是列表类型且元素是浮点数
ifnotisinstance(content_vector, list):
content_vector = content_vector.tolist() ifhasattr(content_vector, 'tolist') elselist(content_vector)
# 确保所有元素都是浮点数
content_vector = [float(v) for v in content_vector]
# 准备文档
document = {
"content": doc["content"],
"content_vector": content_vector,
"category": doc["category"]
}
# 写入到 Elasticsearch
es.index(index=index_name, id=i, document=document)
success_count += 1
logger.debug(f"文档 {i+1} 写入成功")
except Exception as e:
logger.error(f"文档 {i+1} 写入失败: {str(e)}")
# 打印更多错误信息以便调试
import traceback
logger.error(traceback.format_exc())
# 刷新索引以使更改立即可见
es.indices.refresh(index=index_name)
logger.info(f"已成功写入 {success_count}/{len(documents)} 条文档")
1. 数据类型转换:确保向量数据符合ElasticSearch的要求。 2. 逐条处理:单独处理每条记录,避免一条错误影响整批数据。 3. 成功率统计:记录成功写入的文档数量。 4. 索引刷新:写入完成后刷新索引,使新数据立即可搜索。
ElasticSearch版本检测
为了适应不同版本的ElasticSearch,我们实现了版本检测功能:
def get_es_version():
try:
info = es.info()
version = info['version']['number']
logger.info(f"检测到 ElasticSearch 版本: {version}")
return version
except Exception as e:
logger.error(f"获取 ES 版本失败: {str(e)}")
return "7.0.0" # 默认假设为 7.x
这个函数通过调用ElasticSearch的info API获取版本信息。如果无法获取,则默认为7.0.0版本。这种版本检测机制使代码能够适应不同版本的ElasticSearch,提高了兼容性和可移植性。
语义搜索实现
语义搜索是向量检索系统的核心功能:
def semantic_search(query_text, top_k=3):
logger.info(f"执行语义搜索: '{query_text}', top_k={top_k}")
try:
# 将查询文本转换为向量
logger.debug("转换查询文本为向量...")
query_vector = model.encode(query_text)
# 确保向量是列表类型且元素是浮点数
ifnotisinstance(query_vector, list):
query_vector = query_vector.tolist() ifhasattr(query_vector, 'tolist') elselist(query_vector)
query_vector = [float(v) for v in query_vector]
# 检查 ES 版本并构建适当的查询
es_version = get_es_version()
if es_version.startswith("8."):
# ES 8.x 版本的 KNN 查询
query = {
"knn": {
"field": "content_vector",
"query_vector": query_vector,
"k": top_k,
"num_candidates": 100
},
"_source": ["content", "category"]
}
else:
# ES 7.x 版本的查询 (使用 script_score)
query = {
"query": {
"script_score": {
"query": {"match_all": {}},
"script": {
"source": "cosineSimilarity(params.query_vector, 'content_vector') + 1.0",
"params": {"query_vector": query_vector}
}
}
},
"size": top_k,
"_source": ["content", "category"]
}
# 执行搜索
logger.debug("执行ES搜索...")
response = es.search(
index=index_name,
body=query
)
logger.info(f"搜索完成,找到 {len(response['hits']['hits'])} 条结果")
print(f"\n查询: '{query_text}'")
print("搜索结果:")
for hit in response["hits"]["hits"]:
score = hit["_score"]
content = hit["_source"]["content"]
category = hit["_source"]["category"]
print(f"相似度: {score:.4f}, 分类: {category}, 内容: {content}")
except Exception as e:
logger.error(f"语义搜索出错: {str(e)}")
import traceback
logger.error(traceback.format_exc())
print(f"搜索失败: {str(e)}")
这个函数实现了以下功能:
1. 查询向量化:将用户输入的查询文本转换为向量。 2. 版本适配:根据ElasticSearch版本选择适当的查询语法。 • 8.x版本:使用原生KNN查询 • 7.x版本:使用script_score实现向量相似度计算 3. 结果处理:解析并展示搜索结果,包括相似度分数、分类和内容。 4. 错误处理:全面的异常捕获和日志记录。
其中,两种查询方式的区别值得注意:
• 8.x的KNN查询更高效,直接使用向量索引加速检索。 • 7.x的script_score查询虽然较慢,但更灵活,可以结合其他评分因素。
在script_score查询中,我们使用了余弦相似度+1的方式,将范围从[-1,1]调整为[0,2],使得:
• 2分表示完全相同 • 1分表示无相关性 • 0分表示完全相反
混合搜索实现
除了纯粹的向量搜索,我们还实现了结合结构化过滤的混合搜索:
def hybrid_search(query_text, category=None, top_k=3):
filter_text = f"(分类: {category})"if category else""
logger.info(f"执行混合查询: '{query_text}' {filter_text}, top_k={top_k}")
try:
# 将查询文本转换为向量
query_vector = model.encode(query_text)
# 确保向量是列表类型且元素是浮点数
ifnotisinstance(query_vector, list):
query_vector = query_vector.tolist() ifhasattr(query_vector, 'tolist') elselist(query_vector)
query_vector = [float(v) for v in query_vector]
# 检查 ES 版本
es_version = get_es_version()
if es_version.startswith("8."):
# ES 8.x 版本的查询
knn_query = {
"knn": {
"field": "content_vector",
"query_vector": query_vector,
"k": top_k,
"num_candidates": 100
}
}
# 如果指定了分类,添加过滤条件
if category:
logger.debug(f"添加分类过滤: {category}")
query = {
"bool": {
"must": [knn_query],
"filter": [{"term": {"category": category}}]
}
}
else:
query = knn_query
else:
# ES 7.x 版本的查询
script_score = {
"script_score": {
"query": {"match_all": {}},
"script": {
"source": "cosineSimilarity(params.query_vector, 'content_vector') + 1.0",
"params": {"query_vector": query_vector}
}
}
}
# 如果指定了分类,添加过滤条件
if category:
logger.debug(f"添加分类过滤: {category}")
query = {
"bool": {
"must": [script_score],
"filter": [{"term": {"category": category}}]
}
}
else:
query = script_score
# 执行搜索
logger.debug("执行ES混合搜索...")
search_body = {"query": query, "_source": ["content", "category"], "size": top_k}
response = es.search(
index=index_name,
body=search_body
)
logger.info(f"混合搜索完成,找到 {len(response['hits']['hits'])} 条结果")
print(f"\n混合查询: '{query_text}' {f'(分类: {category})' if category else ''}")
print("搜索结果:")
for hit in response["hits"]["hits"]:
score = hit["_score"]
content = hit["_source"]["content"]
category = hit["_source"]["category"]
print(f"相似度: {score:.4f}, 分类: {category}, 内容: {content}")
except Exception as e:
logger.error(f"混合搜索出错: {str(e)}")
import traceback
logger.error(traceback.format_exc())
print(f"搜索失败: {str(e)}")
混合搜索的核心是ElasticSearch的布尔查询(bool query),它允许我们:
1. 组合多种查询:通过must、should、must_not等子句组合不同类型的查询。 2. 结合过滤条件:使用filter子句添加不影响评分的过滤条件。
在我们的实现中,根据是否提供category参数,构建了两种查询:
• 纯向量查询:直接使用KNN或script_score • 混合查询:向量查询+分类过滤
混合查询的优势在于可以在保留语义检索能力的同时,通过结构化过滤缩小结果范围,提高精确度。
主函数与程序流程
主函数整合了上述所有功能,形成完整的程序流程:
def main():
logger.info("=== 向量搜索演示程序开始 ===")
try:
# 创建索引
create_index()
# 生成并索引样本数据
documents = generate_sample_data()
index_documents(documents)
# 等待一秒确保索引已经完成
logger.info("等待索引完成...")
time.sleep(1)
# 进行语义搜索测试
logger.info("=== 开始执行测试查询 ===")
semantic_search("人工智能和机器学习有什么区别")
semantic_search("环境保护和可持续发展")
semantic_search("金融市场和经济发展")
semantic_search("海底捞的火锅不好吃")
# 进行混合查询测试
hybrid_search("人工智能技术", category="科技")
hybrid_search("经济影响和发展", category="经济")
logger.info("=== 演示程序结束 ===")
except Exception as e:
logger.error(f"程序执行出错: {str(e)}")
print(f"程序执行出错: {str(e)}")
if __name__ == "__main__":
main()
这个主函数设计得简洁清晰,总体流程是:
1. 初始化:创建ElasticSearch索引 2. 数据准备:生成样本数据并写入 3. 功能测试:执行多种语义搜索和混合搜索示例 4. 结果展示:打印搜索结果 5. 异常处理:捕获并记录任何异常
值得注意的是,我们添加了time.sleep(1)
,确保索引刷新完成后再进行搜索。在生产环境中,可以使用更可靠的方式检查索引是否准备就绪。
测试查询多样性也很好:
• "人工智能和机器学习有什么区别":技术领域的概念比较 • "环境保护和可持续发展":环保主题 • "金融市场和经济发展":经济相关 • "海底捞的火锅不好吃":与样本数据无关的查询,测试系统对不相关内容的响应
核心算法与原理
向量相似度算法
本系统使用余弦相似度作为向量间的相似度度量:
cosineSimilarity(v1, v2) = dot(v1, v2) (norm(v1) * norm(v2))
其中:
• dot(v1, v2)是两个向量的点积 • norm(v)是向量的L2范数(欧几里得范数)
在ElasticSearch 7.x中,我们使用script_score实现:
"source": "cosineSimilarity(params.query_vector, 'content_vector') + 1.0"
而在ElasticSearch 8.x中,直接使用内置的KNN查询,底层可能采用了更高效的算法(如HNSW):
"knn": {
"field": "content_vector",
"query_vector": query_vector,
"k": top_k,
"num_candidates": 100
}
参数num_candidates
控制了近似算法的精度与速度的平衡,值越大越准确但越慢。
向量索引原理
ElasticSearch的向量索引原理因版本而异:
• 7.x版本:向量存储在索引中但没有专门的向量索引结构,检索时使用脚本计算相似度,是精确但较慢的方法。 • 8.x版本:引入了HNSW算法作为向量索引,创建了一种图结构来加速近似最近邻搜索。
HNSW算法的核心思想是构建一个多层导航图,使得:
1. 顶层有少量节点,用于粗略定位 2. 底层包含所有节点,用于精确搜索 3. 搜索时从顶层开始,逐层下降,最终在底层找到最近邻
这种分层设计大大减少了搜索空间,使得向量搜索可以在次线性时间内完成。
混合查询原理
系统的混合查询基于ElasticSearch的Bool Query:
"bool": {
"must": [knn_query],
"filter": [{"term": {"category": category}}]
}
这种查询的工作原理是:
1. must子句:必须满足,且影响评分。我们放入了向量查询,使得结果按向量相似度排序。 2. filter子句:必须满足,但不影响评分。我们用它进行分类过滤,缩小结果范围。
ElasticSearch的优化使得filter查询特别高效,会自动缓存过滤器结果。
应用场景分析
智能客服系统
在智能客服系统中,本代码可以用于:
1. FAQ匹配:将用户问题与预设的常见问题匹配 2. 知识库检索:从知识库中找出与用户问题相关的文档 3. 多轮对话记忆:存储对话历史并检索相关上下文
内容推荐系统
向量检索在内容推荐系统中的应用:
1. 相似内容推荐:基于用户当前浏览的内容找出相似内容 2. 个性化推荐:将用户兴趣向量与内容向量匹配 3. 冷启动处理:为新用户/新内容提供基于语义而非协同过滤的推荐
企业内部搜索
企业内部文档检索应用:
1. 语义搜索:找出与查询语义相关而非仅包含关键词的文档 2. 权限过滤:结合用户权限进行过滤 3. 多来源整合:统一检索不同系统的文档
法律文档检索
法律文档中的应用有特殊之处:
1. 案例相似性:寻找与当前案例相似的历史判例 2. 法规关联:查找与特定条文相关的其他法规 3. 证据匹配:将证据与法律依据关联
完整代码关注公众号,回复:6481 领取
欢迎关注我的公众号“编程与架构”,原创技术文章第一时间推送。




