暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

LangChain---LCEL深度解析:构建模块化与可组合的智能链条

AI云枢 2025-06-02
133
【本期目标】
  • 掌握 LCEL (LangChain Expression Language) 的核心调用方式 (.invoke().stream().batch())。
  • 理解 LCEL 的数据流控制机制 (|RunnablePassthroughRunnableParallel)。
  • 学会如何自定义 Runnable,将任意业务逻辑融入链条。
  • 了解如何使用 .bind() 等方法精细化控制模型行为
  • 通过综合案例,实践 LCEL 在复杂流程中的应用。

LCEL核心操作:执行与流式传输
Runnable 是 LangChain 的核心,它定义了统一的输入和输出接口。与 Runnable 交互,主要通过以下几种调用方法:
  1. .invoke():同步单次调用
    • 这是最直接的调用方式。给定一个输入,它会同步地执行链条,并返回最终结果。
    • 适用于一次性获取结果的场景。
  2. .stream():同步流式调用
    • 对于聊天模型,结果往往是逐字或逐句生成的。.stream() 方法会返回一个迭代器,你可以实时地获取到模型生成内容的每一部分(chunk)。
    • 适用于聊天机器人、实时内容生成等需要即时反馈的场景。
  3. .batch():同步批量调用
    • 如果你有多个独立的输入需要同时处理,.batch() 可以提高效率。它接受一个输入列表,返回一个对应输出的列表。
    • 适用于对大量数据进行批处理,例如文档摘要、情感分析等。
  4. 异步版本 (.ainvoke().astream().abatch()):
    • 所有上述方法都有对应的异步版本(以 a 开头)。在Python的 asyncio 环境下,它们可以实现非阻塞的并发执行,尤其在处理大量请求或进行I/O密集型操作时,能显著提升性能。
    • 本次主要使用同步版本,但理解异步是未来生产环境部署的关键。
【代码示例】
    from dotenv import load_dotenv
    import os
    from langchain_openai import ChatOpenAI
    from langchain_core.prompts import ChatPromptTemplate
    from langchain_core.output_parsers import StrOutputParser
    import asyncio


    load_dotenv()
    model = ChatOpenAI(model="Qwen3-235B-A22B", temperature=0.7)
    prompt = ChatPromptTemplate.from_template("讲一个关于{topic}的简短笑话。")
    chain = prompt | model | StrOutputParser()


    print("--- 1. .invoke():单次同步调用 ---")
    joke_sync = chain.invoke({"topic""编程"})
    print(f"编程笑话: {joke_sync}\n")


    print("--- 2. .stream():流式调用 ---")
    print("小猫笑话 (逐字输出):")
    for chunk in chain.stream({"topic""小猫"}):
        print(chunk, end="", flush=True)
    print("\n")


    print("--- 3. .batch():批量调用 ---")
    topics = ["大象""咖啡""侦探"]
    jokes_batch = chain.batch([{"topic": t} for t in topics]) # 传入一个字典列表
    print("批量生成的笑话:")
    for i, joke in enumerate(jokes_batch):
        print(f"关于'{topics[i]}'的笑话: {joke}")
    print("\n")


    # --- 4. .ainvoke():异步调用示例 ---
    async def run_async_example():
        print("--- .ainvoke():异步调用 ---")
        joke_async = await chain.ainvoke({"topic""外星人"})
        print(f"外星人笑话 (异步): {joke_async}")


    if __name__ == "__main__":
        asyncio.run(run_async_example()) # 运行异步函数
    小结: 掌握这四种调用方式,你就掌握了与任何LCEL链条进行交互的基础。

    LCEL核心构建:管道与数据流控制
    LCEL 的强大之处在于它提供了灵活的数据流控制机制,让你能精准地定义数据如何穿梭于各个 Runnable 之间。
    1. 管道操作符 | (和 .pipe()):
      • 这是LCEL最核心的连接符。它将一个 Runnable 的输出作为下一个 Runnable 的输入。
      • A | B | C 的含义是:将 A 的输出传给 B,再将 B 的输出传给 C。简洁而直观!
      • 注意:每个 Runnable 必须能接受前一个 Runnable 的输出作为输入。
      # 示例:简单的文本处理管道
      from langchain_core.runnables import RunnableLambda


      # 自定义一个简单的Runnable,将文本转为大写
      to_uppercase = RunnableLambda(lambda text: text.upper())
      # 再自定义一个Runnable,添加感叹号
      add_exclamation = RunnableLambda(lambda text: text + "!!!")


      simple_pipeline = StrOutputParser() | to_uppercase | add_exclamation


      print("--- 管道操作符 | 示例 ---")
      processed_text = simple_pipeline.invoke("hello world")
      print(f"处理后的文本: {processed_text}\n"# 输出:HELLO WORLD!!!
      2. RunnablePassthrough:数据透传与赋值
        • 当链条中某个 Runnable 的输出不需要作为下一个 Runnable 的输入,但你希望原始输入或某个中间值能继续传递到下游时,RunnablePassthrough 就派上用场了。
        • 基本用法: 将输入原封不动地传递下去。
        • .assign() 方法: 最强大的功能。它允许你在保持原始输入不变的基础上,动态地增加或修改输入字典中的键值对。这是实现多输入链条的关键!
        from langchain_core.runnables import RunnablePassthrough
        # 例子:计算文本长度,并将长度信息和原始文本一起传递给LLM


        # 链条:原始输入 (例如 {"text": "..."})
        # -> RunnablePassthrough.assign(length=lambda x: len(x["text"]))
        #    (现在输入变成了 {"text": "...", "length": 123})
        # -> Prompt (可以同时使用 {text} 和 {length})


        length_and_text_prompt = ChatPromptTemplate.from_template(
            "原始文本: {text}\n文本长度: {length}\n请根据原始文本,总结其主要观点。"
        )


        # 这里的 RunnablePassthrough.assign() 动态地计算了 'length' 并加入了输入字典
        # 原始的 'text' 仍然存在并被传递下去
        chain_with_length = (
            RunnablePassthrough.assign(
                length=lambda input_dict: len(input_dict["text"])
            )
            | length_and_text_prompt
            | model
            | StrOutputParser()
        )


        print("--- RunnablePassthrough.assign() 示例 ---")
        result = chain_with_length.invoke({"text""LangChain让构建大模型应用变得如此简单,大大提高了开发效率!"})
        print(f"总结与长度信息: {result}\n")
        3. RunnableParallel:并行处理
          • 当你有多个独立的 Runnable 需要同时执行,并将它们的输出合并成一个字典时,RunnableParallel 是最佳选择。
          • 它接受一个字典,字典的每个键是一个输出名称,对应的值是一个 Runnable。这些 Runnable 会并行执行,并将各自的输出作为新字典中的值。
          from langchain_core.runnables import RunnableParallel


          # 例子:同时让LLM做文本总结和关键词提取
          summarize_chain = ChatPromptTemplate.from_template("总结以下文本:\n{text}") | model | StrOutputParser()
          keywords_chain = ChatPromptTemplate.from_template("从以下文本中提取3-5个关键词(用逗号分隔):\n{text}") | model | StrOutputParser()


          # 使用 RunnableParallel 将两个链并行化
          # 注意:这里的输入 {text} 会被同时传递给 summarize_chain 和 keywords_chain
          parallel_processor = RunnableParallel(
              summary=summarize_chain,
              keywords=keywords_chain
          )


          text_to_analyze = "区块链技术是一种去中心化的分布式账本技术,广泛应用于加密货币领域,其核心特点是不可篡改性和透明性。"


          print("--- RunnableParallel 示例 ---")
          results = parallel_processor.invoke({"text": text_to_analyze})
          print(f"总结:\n{results['summary']}")
          print(f"关键词:\n{results['keywords']}\n")
          小结:| 用于串联,RunnablePassthrough 用于数据传递和增强,RunnableParallel 用于并行处理。它们共同构成了LCEL灵活的数据流控制能力。

          LCEL高级技巧:自定义与参数绑定
          LCEL 允许你将任意的 Python 逻辑集成到链条中,并能精细地控制模型的行为。
          1. 自定义 Runnable
          • 函数式 (@chain 装饰器): 最简洁的方式,适用于将普通函数快速转换为 Runnable
            from langchain_core.runnables import chain


            @chain
            def clean_text(text: str) -> str:
                """一个自定义Runnable,清理文本中的多余空格并转换为小写。"""
                return text.lower().strip().replace("  "" ")


            @chain
            def censor_words(text: str, words_to_censor: list) -> str:
                """一个自定义Runnable,审查特定词汇。"""
                for word in words_to_censor:
                    text = text.replace(word, "[CENSORED]")
                return text


            custom_logic_chain = (
                ChatPromptTemplate.from_template("请处理这段文本:\n{text_input}")
                | model
                | StrOutputParser()
                | clean_text # 使用自定义函数Runnable
                | censor_words.bind(words_to_censor=["糟糕""问题"]) # 绑定参数给自定义函数
            )


            print("--- 自定义 Runnable (函数式) 示例 ---")
            original_input = "  这是一个 糟糕的 测试文本,存在 一些 问题。  "
            processed_output = custom_logic_chain.invoke({"text_input": original_input})
            print(f"原始文本: '{original_input}'")
            print(f"处理后文本: '{processed_output}'\n")
            • 类式 (Runnable 基类): 当你的自定义逻辑更复杂、需要内部状态、或需要更严格的类型定义时,继承 Runnable 类是更好的选择。
              from langchain_core.runnables import Runnable
              from pydantic import BaseModel,Field


              class WordCounter(Runnable):
                  def __init__(self, target_word: str):
                      self.target_word = target_word.lower()
                  def invoke(self, input_dict: dict, config=None) -> dict:
                      """计算目标词在文本中出现的次数"""
                      input_text = input_dict["text_input"]
                      count = input_text.lower().count(self.target_word)
                      return {"original_text": input_text, "word_count": count, "target": self.target_word}
                  # 可以选择定义输入/输出Schema,有助于类型检查和工具使用
                  class InputSchema(BaseModel):
                      text_input: str = Field(..., description="需要计数的文本")
                  class OutputSchema(BaseModel):
                      original_text: str = Field(..., description="原始文本")
                      word_count: int = Field(..., description="目标词出现次数")
                      target: str = Field(..., description="目标词")


                  def get_input_schema(self, config=None):
                      return self.InputSchema
                  def get_output_schema(self, config=None):
                      return self.OutputSchema


              word_count_chain = (
                  RunnablePassthrough.assign(text_input=lambda x: x["text"]) # 从字典中提取文本
                  | WordCounter(target_word="代码"# 使用自定义类Runnable
                  | ChatPromptTemplate.from_template(
                      "在原始文本 '{original_text}' 中,词语 '{target}' 出现了 {word_count} 次。这是一个准确的计数。"
                  )
                  | model
                  | StrOutputParser()
              )


              print("--- 自定义 Runnable (类式) 示例 ---")
              text_for_count = {"text""LangChain让写代码变得更有趣,代码质量也更高了。"}
              result_word_count = word_count_chain.invoke(text_for_count)
              print(result_word_count)
              小结:@chain 适合快速封装函数,继承 Runnable 适合复杂逻辑和更严格的类型定义。
              2. .bind():绑定模型参数
                • .bind() 允许你在链的特定位置,为下游的模型绑定额外的参数。这非常灵活,因为你可以在同一个应用中使用同一个模型,但在不同的链条中为其绑定不同的行为。
                • 常见用途:
                  • temperature:控制模型生成文本的随机性。
                  • stop:指定模型停止生成的字符序列。
                  • tools / tool_choice这是最重要、最常用的绑定,用于实现 Function Calling
                # 例子:绑定 temperature 让模型输出更具创意
                creative_chain = ChatPromptTemplate.from_template("写一个关于{topic}的诗歌。"| model.bind(temperature=0.8| StrOutputParser()
                strict_chain = ChatPromptTemplate.from_template("总结以下文本的核心要点:\n{text}"| model.bind(temperature=0.0| StrOutputParser()


                print("\n--- .bind() 示例 ---")
                print("创意诗歌:")
                print(creative_chain.invoke({"topic""星空"}))
                print("\n严格总结:")
                print(strict_chain.invoke({"text""LCEL是LangChain Expression Language的缩写,是LangChain的核心。它提供了一种声明式的方式来构建链条,并通过管道操作符实现组件的连接。LCEL的优势在于其模块化、可组合性和易于调试的特性。"}))
                小结:.bind() 赋予你在链中精细控制模型行为的能力。
                3. .with_config():配置运行时行为
                  • 允许你在链的某个特定部分配置运行时参数,例如 LangSmith 的 run_nametags,或者超时时间等。这对于调试和监控非常有用。
                  # 示例:为链条添加一个Tag,方便LangSmith追踪
                  # 假设 chain 是你在上面定义的某个链
                  tagged_chain = chain.with_config({"tags": ["my_first_tag""demo_chain"]})
                  # 当你运行 tagged_chain.invoke() 时,这些tags会出现在LangSmith的追踪记录中

                  综合案例:构建智能内容摘要与关键词提取器
                  构建一个实用的小工具:它能同时对一篇长文本进行摘要和关键词提取。
                    from dotenv import load_dotenv
                    import os
                    from langchain_openai import ChatOpenAI
                    from langchain_core.prompts import ChatPromptTemplate
                    from typing import List # 导入 List
                    from langchain_core.output_parsers import StrOutputParser, JsonOutputParser
                    from langchain_core.runnables import RunnablePassthrough, RunnableParallel, chain
                    from pydantic import BaseModel,Field


                    load_dotenv()
                    model = ChatOpenAI(model="Qwen3-235B-A22B", temperature=0.3)
                    parser = StrOutputParser()


                    # --- 1. 定义总结的链 ---
                    summarize_prompt = ChatPromptTemplate.from_template("请对以下文本进行简洁的摘要,不超过100字:\n{text}")
                    summarize_chain = summarize_prompt | model | parser


                    # --- 2. 定义提取关键词的链 ---
                    # 定义 Pydantic 模型来表示关键词列表的输出结构
                    class Keywords(BaseModel):
                        keywords: List[str] = Field(description="一组关键词")
                    keywords_prompt = ChatPromptTemplate.from_template(
                        "从以下文本中提取5个最重要的关键词,以JSON数组格式输出,例如: [\"词1\", \"词2\"]:\n{text}"
                    )
                    # 使用 with_structured_output 绑定 schema,确保模型输出JSON
                    keywords_chain = keywords_prompt | model.with_structured_output(
                        schema=Keywords
                    )


                    # --- 3. 自定义Runnable,用于文本预处理 ---
                    @chain
                    def preprocess_text(text: str) -> str:
                        """移除文本中的多余换行符和一些特殊字符。"""
                        cleaned = text.replace("\n"" ").replace("  "" ").replace("。"".").strip()
                        return cleaned


                    # --- 构建总的LCEL链条 ---
                    # 整个流程需要输入一个 {text_to_process}
                    full_content_analyzer = (
                        RunnablePassthrough.assign(
                            processed_text=lambda x: preprocess_text.invoke(x["text"]), # 明确传递 x["text"] 给 preprocess_text
                        )
                        | RunnableParallel( # 并行执行摘要和关键词提取
                            summary=summarize_chain.with_config(run_name="SummaryGeneration"), # 给子链命名,方便LangSmith追踪
                            keywords=keywords_chain.with_config(run_name="KeywordExtraction")
                        )
                        # 最终输出是 {"summary": "...", "keywords": ["...", "..."]}
                    )


                    # --- 运行案例 ---
                    long_text = """
                    LangChain是一个非常强大的框架,旨在帮助开发者更高效地构建基于大语言模型(LLMs)的应用程序。
                    它的核心理念是将复杂的LLM应用分解为模块化的组件(称为Runnables),然后通过LCEL(LangChain Expression Language)将这些组件像管道一样连接起来。
                    LCEL提供了极其灵活的数据流控制能力,包括并行处理和条件分支。
                    这使得开发者能够轻松实现各种复杂的AI应用,例如:
                    1. 问答系统:结合外部知识库(RAG)。
                    2. 聊天机器人:管理对话历史和上下文。
                    3. Agent:让LLM具备自主决策和工具调用能力。
                    LangChain强调LCEL的简洁性和可组合性,以及与LangSmith工具的深度集成,
                    LangSmith可以帮助开发者进行调试、测试和评估,大大提升了开发体验。
                    """


                    print("--- 智能内容摘要与关键词提取 ---")
                    analysis_results = full_content_analyzer.invoke({"text": long_text})


                    print("------ 结果 ------")
                    print(f"**摘要:**\n{analysis_results['summary']}")
                    print(f"\n**关键词:**")
                    print(f"DEBUG: type of keywords: {type(analysis_results['keywords'])}")
                    print(f"DEBUG: content of keywords: {analysis_results['keywords']}")
                    print(f"{', '.join(analysis_results['keywords'].keywords)}")
                    print("------------------")
                    代码解析:
                    1. 数据准备: long_text 作为输入。
                    2. 预处理:RunnablePassthrough.assign(processed_text=preprocess_text) 这一步很巧妙。它会执行 preprocess_text 这个自定义 Runnable,并将 long_text 作为其输入。preprocess_text 的输出(清理后的文本)会赋值给 processed_text 键。同时,原始的 text 键会继续存在于输入字典中,供后续 summarize_chain 和 keywords_chain 使用。
                    3. 并行处理:RunnableParallel 包含了 summarize_chain 和 keywords_chain。它们会同时执行,并各自接收 text 作为输入。
                    4. 最终输出:full_content_analyzer 的 invoke 方法会返回一个字典,其中包含 summary 和 keywords 两个键,对应各自任务的输出。

                    LCEL优势
                    为什么LCEL 已成为构建所有应用的推荐方式?
                    • 极简的语法: 管道符 | 让链条结构一目了然,大大提高了代码的可读性和维护性。
                    • 高度模块化: 每个 Runnable 都是一个独立的、可重用的组件,你可以像搭积木一样自由组合它们。
                    • 强大的灵活性:.assign()RunnableParallel.bind() 等方法提供了对数据流和模型行为的精细控制。
                    • 原生支持流式和异步:.stream() 和 async 方法让实时应用和高性能并发成为可能。
                    • 卓越的可观察性: LCEL 链与 LangSmith 无缝集成,提供无与伦比的调试和性能分析能力。
                    • 兼容性: LCEL 可以轻松集成旧的 Chain 类(它们也继承了 Runnable),确保平滑过渡。
                    总之,LCEL 让 LangChain 从一个“工具箱”升级为了一套“工程化开发范式”,让构建复杂AI应用变得更加高效和可靠。

                    文章转载自AI云枢,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                    评论