Chains
Chains
简介
Chain:链,用于将多个组件(提示模板、LLM模型、记忆、工具等)连接起来,形成可复用的工作流,完成复杂的任务。
只要都组件都实现runnable接口,上一个的输出就是下一个的输入。
LCEL(推荐)
LangChain表达式语言(LCEL,LangChain Expression Language)是一种声明式方法,可以轻松地将多个组件链接成 AI 工作流。它通过Python原生操作符(如管道符 | )将组件连接成可执行流程,显著简化了AI应用的开发。
基本使用:
chain = prompt | model | output_parser
chain.invoke({"input":"What's your name?"})LCEL 中所有组件(Prompt、LLM、Parser、Retriever 等)都实现了 Runnable 接口,拥有统一的调用方法。
1.1 统一调用方法
| 方法 | 说明 | 返回值 |
|---|---|---|
.invoke(input) | 同步单次调用 | 单个结果 |
.stream(input) | 流式输出 | 生成器(逐 token) |
.batch(inputs) | 批量并发调用 | 结果列表 |
.ainvoke() | 异步单次调用 | 协程 |
.astream() | 异步流式输出 | 异步生成器 |
.abatch() | 异步批量调用 | 协程列表 |
# invoke:最基本的调用
result = chain.invoke({"topic": "AI"})
# stream:流式逐字输出,适合前端实时展示
for chunk in chain.stream({"topic": "AI"}):
print(chunk, end="", flush=True)
# batch:多个输入并发处理,效率更高
results = chain.batch([
{"topic": "AI"},
{"topic": "Python"},
{"topic": "数据库"},
])
# ainvoke:异步环境(FastAPI、Jupyter 等)使用
result = await chain.ainvoke({"topic": "AI"})⚠️ 注意:
batch在底层会并发执行,比循环调用invoke快得多,生产环境推荐使用。
提示
在 Python 中,运算符(如 +、|)的行为由类的魔法方法决定。例如:
a + b 本质调用的是 a.add(b)
a | b 本质调用的是 a.or(b)
只需要自行实现类的or方法,即可对|符号的功能进行重写。
runnable基类对or进行了改写
二、基础组合:管道符 |
2.1 单链:线性串联
| 将左侧输出作为右侧输入,构成线性处理流水线。
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
prompt = ChatPromptTemplate.from_template("请用一句话介绍:{topic}")
llm = ChatOpenAI(model="gpt-4o")
parser = StrOutputParser()
# 等价于 prompt.pipe(llm).pipe(parser)
chain = prompt | llm | parser
chain.invoke({"topic": "向量数据库"})💡 关键:
|本质是语法糖,等价于.pipe()方法。数据流方向:prompt → llm → parser,每一步的输出类型必须与下一步的输入类型匹配。
2.2 输入输出类型对照
字典 dict
→ ChatPromptTemplate → ChatPromptValue
→ ChatOpenAI → AIMessage
→ StrOutputParser → str三、多输入处理:RunnableParallel
RunnableParallel 让多个子链同时接收相同输入并并行执行,结果合并为字典。
3.1 显式创建
from langchain_core.runnables import RunnableParallel
parallel = RunnableParallel({
"summary": summary_chain,
"sentiment": sentiment_chain,
"keywords": keyword_chain,
})
result = parallel.invoke({"text": "今天发布了重要政策..."})
# 输出:
# {
# "summary": "...",
# "sentiment": "中性",
# "keywords": ["政策", "发布"]
# }3.2 隐式简写(最常用)
⚠️ 关键规则:在管道中,字典字面量会自动转换为
RunnableParallel,无需显式声明。
# 字典 {"context": ..., "question": ...} 自动成为并行步骤
# retriever 和 RunnablePassthrough() 同时接收同一个输入
chain = (
{"context": retriever, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
chain.invoke("向量数据库是什么?")数据流示意:
"向量数据库是什么?"
│
├──► retriever ──► "检索到的文档内容..." ─┐
│ │ 合并为 dict → prompt → llm → parser
└──► passthrough ──► "向量数据库是什么?" ─┘四、透传输入:RunnablePassthrough
RunnablePassthrough 的作用是将输入原封不动地传递到下一步,常用于在并行步骤中保留原始输入。
4.1 基本透传
from langchain_core.runnables import RunnablePassthrough
# 仅透传,不做处理
chain = RunnablePassthrough() | some_chain4.2 .assign():透传 + 追加新字段
💡 核心用法:在保留所有原有字段的同时,计算并追加新字段,常用于链式累积数据。
# 输入 dict 的所有字段原样保留,同时追加 summary 和 word_count
enriched = RunnablePassthrough.assign(
summary=summarize_chain, # 用子链计算
word_count=lambda x: len(x["text"].split()), # 用函数计算
)
result = enriched.invoke({"text": "一段很长的文字内容..."})
# 输出:
# {
# "text": "一段很长的文字内容...", ← 原有字段保留
# "summary": "内容摘要", ← 新增
# "word_count": 7 ← 新增
# }五、自定义逻辑:RunnableLambda
将任意 Python 函数包装为 Runnable 组件,插入管道中。
5.1 显式包装
from langchain_core.runnables import RunnableLambda
def parse_score(text: str) -> dict:
score = int(text.strip())
return {"score": score, "level": "高" if score > 7 else "低"}
chain = prompt | llm | StrOutputParser() | RunnableLambda(parse_score)5.2 隐式简写
💡 简写:在管道中,Python 函数和 lambda 会自动转为
RunnableLambda,无需手动包装。
# 函数直接放入管道
chain = prompt | llm | StrOutputParser() | (lambda x: x.upper())
# 普通函数也可以
chain = prompt | llm | StrOutputParser() | my_custom_parser六、条件分支:RunnableBranch
根据输入内容动态路由到不同子链,实现条件逻辑。
6.1 基本用法
from langchain_core.runnables import RunnableBranch
branch = RunnableBranch(
# 格式:(条件函数, 对应链),从上到下依次判断,命中第一个
(lambda x: x["lang"] == "zh", chinese_chain),
(lambda x: x["lang"] == "en", english_chain),
(lambda x: x["lang"] == "ja", japanese_chain),
default_chain # ⚠️ 必须提供默认分支,否则报错
)
branch.invoke({"lang": "zh", "text": "你好"})6.2 结合路由判断的完整示例
# 先用 LLM 判断类型,再路由到专业处理链
classify_chain = classify_prompt | llm | StrOutputParser()
full_chain = classify_chain | RunnableBranch(
(lambda x: "技术" in x, tech_chain),
(lambda x: "法律" in x, legal_chain),
general_chain
)七、字段提取:itemgetter
从字典中取特定字段,常用于并行结果后的字段分发。
from operator import itemgetter
chain = (
RunnableParallel({
"answer": answer_chain,
"sources": source_chain,
"question": RunnablePassthrough(),
})
| {
# 分别对不同字段做后处理
"formatted_answer": itemgetter("answer") | format_chain,
"source_count": itemgetter("sources") | (lambda x: len(x)),
"original_q": itemgetter("question"),
}
)八、综合实战:RAG 完整链路
以下是一个集成了检索增强(RAG)+ 并行评估的完整示例,综合运用以上所有概念。
from langchain_core.runnables import RunnableParallel, RunnablePassthrough, RunnableLambda
from operator import itemgetter
# ── Step 1:检索步骤(并行:文档检索 + 保留原始问题)──
retrieval = {
"context": retriever | format_docs, # 检索并格式化文档
"question": RunnablePassthrough(), # 原始问题透传
}
# ── Step 2:生成答案 ──
generate = answer_prompt | llm | StrOutputParser()
# ── Step 3:并行输出(答案 + 分类 + 原始问题)──
full_chain = (
retrieval
| RunnablePassthrough.assign(answer=generate) # 追加 answer 字段
| RunnableParallel({
"answer": itemgetter("answer"),
"category": itemgetter("question") | classify_prompt | llm | StrOutputParser(),
"question": itemgetter("question"),
})
)
result = full_chain.invoke("向量数据库和传统数据库有什么区别?")
# {
# "answer": "向量数据库专为高维向量相似度搜索设计...",
# "category": "技术对比",
# "question": "向量数据库和传统数据库有什么区别?"
# }九、调试与可观测性
9.1 插入调试节点
def debug(x, label="DEBUG"):
print(f"[{label}] type={type(x).__name__}, value={x}")
return x # ⚠️ 必须返回 x,否则中断管道
chain = (
prompt
| RunnableLambda(lambda x: debug(x, "after_prompt"))
| llm
| RunnableLambda(lambda x: debug(x, "after_llm"))
| StrOutputParser()
)9.2 命名与追踪
# 给链命名,便于 LangSmith 等追踪工具识别
chain = chain.with_config({"run_name": "my_rag_chain"})
# 覆盖运行时配置(如切换模型)
chain.invoke(input, config={"run_name": "test_run"})9.3 .get_graph() 可视化结构
# 打印链的拓扑结构(ASCII 图)
chain.get_graph().print_ascii()十、核心概念速查
┌─────────────────────────────┐
│ LCEL 组件总览 │
└─────────────────────────────┘
管道符 | A | B | C 线性串联,A 的输出 → B 的输入
RunnableParallel {"a": chainA, 同时运行多个链,
"b": chainB} 合并为 dict 输出
RunnablePassthrough passthrough() 原样透传输入
RunnablePassthrough .assign(k=fn) 透传全部字段 + 追加新字段
.assign()
RunnableLambda lambda x: ... 任意函数包装为 Runnable
RunnableBranch branch( 条件路由,选择不同子链
(cond, chain), ...)
itemgetter itemgetter("key") 从 dict 中取特定字段类型转换速查
输入类型 → 自动转换为
─────────────────────────────
dict 字面量 → RunnableParallel
Python 函数 → RunnableLambda
lambda 表达式 → RunnableLambda📌 最重要的一条:理解 LCEL 的关键是掌握数据在每一步的形状(类型)。中间结果是
str、dict、AIMessage还是list,决定了下一个组件能否正确接收。遇到报错,优先用调试节点打印中间值。
