今天我们来深入了解 Deer Flow 的核心架构,特别是它如何利用 LangGraph 进行状态管理,让我们一步步解析 Deer Flow 的精妙之处。
LangGraph 简介:为什么选择 LangGraph?
Deer Flow 选择 LangGraph 作为其核心流程控制框架,主要是看中了 LangGraph 在构建复杂代理(Agent)系统方面的强大能力。我们可以从以下几个方面理解其优势:
Stateful (状态管理):LangGraph 的核心设计就是围绕状态(State)进行的。在多步骤、多Agent协作的复杂流程中,能够清晰地定义、传递和更新状态至关重要。Deer Flow 正是利用这一点来管理其研究(Research)过程中的各种信息和中间结果。
Multi-actor (多参与者):现代AI应用,尤其是像 Deer Flow 这样的研究型AI助手,往往需要多个具有不同角色的Agent协同工作(例如计划、执行研究、编码、报告等)。LangGraph 提供了构建多Agent图(Graph)的能力,使得每个Agent可以作为图中的一个节点(Node),并定义它们之间的协作流程。
Cycles (循环与迭代):研究过程往往不是线性的,可能需要多次迭代、反思和修正。LangGraph 支持在图中构建循环(Cycles),这使得 Deer Flow 能够实现如“计划-执行-反馈-修正计划”这样的迭代式工作流,从而提升最终结果的质量。
Deer Flow 中的“状态”核心:State
详解
在 Deer Flow 中,整个研究流程的状态管理是通过一个核心的 State
类来实现的。这个类定义在 src/graph/types.py
文件中。
代码导览:src/graph/types.py
- State
的定义
State
类继承自 LangGraph 的 MessagesState
,并在此基础上扩展了一些 Deer Flow 特有的状态属性,用于在不同的Agent节点之间传递信息、控制流程。
Python
# Copyright (c) 2025 Bytedance Ltd. and/or its affiliates
# SPDX-License-Identifier: MIT
import operator
from typing import Annotated
from langgraph.graph import MessagesState
from src.prompts.planner_model import Plan
class State(MessagesState):
"""State for the agent system, extends MessagesState with next field."""
# Runtime Variables
locale: str = "en-US"
observations: list[str] = []
plan_iterations: int = 0
current_plan: Plan | str = None
final_report: str = ""
auto_accepted_plan: bool = False
enable_background_investigation: bool = True
background_investigation_results: str = None
关键属性解读:
messages
: 继承自MessagesState
,用于存储对话历史和Agent间的消息传递。locale
: 当前用户交互的语言环境,例如 "en-US"。observations
: 一个列表,用于收集各个研究步骤中产生的观察结果或发现。plan_iterations
: 记录计划(Plan)被修改或迭代的次数。current_plan
: 当前正在执行的研究计划,可以是结构化的Plan
对象或其字符串表示。final_report
: 最终生成的研究报告。auto_accepted_plan
: 布尔值,指示计划是否被自动接受(例如,在不需要人工审核的情况下)。enable_background_investigation
: 布尔值,控制是否在制定计划前进行背景调查。background_investigation_results
: 存储背景调查的结果。
这个 State
对象就像一个“接力棒”,在图的各个节点间传递,每个节点根据当前状态执行任务,并可以更新状态,然后传递给下一个节点。
图的构建:src/workflow.py
与 src/graph/builder.py
Deer Flow 的核心工作流程是通过 LangGraph 构建的一个状态图(StateGraph)来驱动的。
src/graph/builder.py
中的图构建逻辑
这个文件负责定义和构建整个Agent工作流图。
Python
# Copyright (c) 2025 Bytedance Ltd. and/or its affiliates
# SPDX-License-Identifier: MIT
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from .types import State
from .nodes import (
coordinator_node,
planner_node,
reporter_node,
research_team_node,
researcher_node,
coder_node,
human_feedback_node,
background_investigation_node,
)
def _build_base_graph():
"""Build and return the base state graph with all nodes and edges."""
builder = StateGraph(State)
builder.add_edge(START, "coordinator")
builder.add_node("coordinator", coordinator_node)
builder.add_node("background_investigator", background_investigation_node)
builder.add_node("planner", planner_node)
builder.add_node("reporter", reporter_node)
builder.add_node("research_team", research_team_node)
builder.add_node("researcher", researcher_node)
builder.add_node("coder", coder_node)
builder.add_node("human_feedback", human_feedback_node)
builder.add_edge("reporter", END)
return builder
def build_graph_with_memory():
"""Build and return the agent workflow graph with memory."""
# use persistent memory to save conversation history
# TODO: be compatible with SQLite / PostgreSQL
memory = MemorySaver()
# build state graph
builder = _build_base_graph()
return builder.compile(checkpointer=memory)
def build_graph():
"""Build and return the agent workflow graph without memory."""
# build state graph
builder = _build_base_graph()
return builder.compile()
graph = build_graph()
关键点:
StateGraph(State)
: 创建一个状态图实例,并指定使用我们之前定义的State
类作为图的状态模型。add_node(name, action)
: 将各个Agent(或其他逻辑处理单元)作为节点添加到图中。这些节点的具体实现在src/graph/nodes.py
中。add_edge(start_node, end_node)
: 定义节点之间的直接连接。compile()
: 将定义好的图结构编译成可执行的工作流。
src/workflow.py
中的工作流驱动
这个文件利用 src/graph/builder.py
中构建好的图,来实际运行Agent工作流。
Python
# Copyright (c) 2025 Bytedance Ltd. and/or its affiliates
# SPDX-License-Identifier: MIT
import asyncio
import logging
from src.graph import build_graph # 导入构建图的函数
# ... (logging configuration) ...
logger = logging.getLogger(__name__)
# Create the graph
graph = build_graph() # 构建图实例
async def run_agent_workflow_async(
user_input: str,
# ... (other parameters) ...
):
# ... (input validation and debug setup) ...
logger.info(f"Starting async workflow with user input: {user_input}")
initial_state = {
"messages": [{"role": "user", "content": user_input}],
"auto_accepted_plan": True,
"enable_background_investigation": enable_background_investigation,
}
config = {
# ... (configuration for the run) ...
}
last_message_cnt = 0
async for s in graph.astream( # 异步执行图
input=initial_state, config=config, stream_mode="values"
):
# ... (process and print stream output) ...
logger.info("Async workflow completed successfully")
if __name__ == "__main__":
print(graph.get_graph(xray=True).draw_mermaid()) # 可以打印出图的 Mermaid 定义,便于可视化
核心在于:
graph = build_graph()
: 获取编译好的工作流图。graph.astream(input=initial_state, ...)
: 使用初始的用户输入(包装在initial_state
中)来异步启动并流式执行这个图。
Agent 如何成为图中的节点:src/graph/nodes.py
的角色
src/graph/nodes.py
文件是 Deer Flow 核心逻辑的汇集地,它定义了图中各个节点的具体行为。每个节点通常对应一个Agent或者一个特定的处理阶段。
例如,文件中包含以下节点(部分列举):
coordinator_node
: 协调器节点,通常是流程的入口,负责初步理解用户意图并决定是否转交给计划器。background_investigation_node
: 背景调查节点,在正式计划前进行初步的信息搜集。planner_node
: 计划器节点,负责根据当前状态(可能包含用户输入、背景调查结果等)生成详细的研究计划。human_feedback_node
: 人工反馈节点,允许用户审查计划并提供反馈,实现人机协作。research_team_node
: 研究团队节点,这是一个调度节点,根据计划中未完成的步骤,决定接下来是调用研究员(researcher_node
)还是编码员(coder_node
)。researcher_node
: 研究员节点,执行具体的研究任务(例如网络搜索、文档阅读)。coder_node
: 编码员(或数据处理)节点,执行需要代码处理的任务(例如运行Python脚本分析数据)。reporter_node
: 报告员节点,根据收集到的所有信息和观察结果,撰写最终的研究报告。
每个这样的节点函数接收当前的 State
对象作为输入,执行其特定的逻辑(通常会调用大语言模型LLM,并可能使用各种工具Tool),然后返回一个 Command
对象或一个字典来更新状态,并可能指定下一个要跳转到的节点(这通常通过 builder.py
中的条件边定义)。
状态在图中如何流转与更新
状态的流转和更新是 LangGraph 的核心机制,在 Deer Flow 中体现得淋漓尽致:
初始状态:工作流从
workflow.py
中定义的initial_state
开始,通常包含用户的初始请求。节点执行:LangGraph 根据图的定义(边的连接和条件)将当前状态传递给相应的节点函数(定义在
nodes.py
中)。状态读取与任务执行:节点函数读取传入
State
中的信息,执行其任务。例如,planner_node
会读取state["messages"]
和state["background_investigation_results"]
来制定计划。状态更新:节点函数执行完毕后,会返回一个字典,这个字典的键值对会被用来更新 State 对象。
例如,planner_node 在生成计划后,可能会返回:
Python
return Command( update={ "messages": [AIMessage(content=full_response, name="planner")], "current_plan": new_plan, # 更新当前计划 }, goto="reporter", # 或者 "human_feedback" )
这里的
update
字典的内容会合并到当前的State
中。条件流转:在
builder.py
中定义的条件边(Conditional Edges)会检查更新后的State
,根据预设的逻辑(通常是一个返回节点名的函数)决定下一个激活哪个节点。例如,research_team_node
会检查current_plan
中的步骤完成情况,来决定是再次调用planner
,还是调用researcher
或coder
。循环与结束:通过这种状态更新和条件流转,流程可以在不同的节点间跳转,形成循环(例如,计划调整后重新执行),直到达到某个结束条件(例如,报告生成完毕并流向
END
节点)。
关键代码片段赏析
让我们看几个关键片段来加深理解:
1. planner_node
(src/graph/nodes.py) - 计划生成与迭代控制
Python
def planner_node(
state: State, config: RunnableConfig
) -> Command[Literal["human_feedback", "reporter"]]:
# ...
plan_iterations = state["plan_iterations"] if state.get("plan_iterations", 0) else 0
# ...
if plan_iterations >= configurable.max_plan_iterations: # 控制最大迭代次数
return Command(goto="reporter")
# ... (调用LLM生成计划) ...
try:
curr_plan = json.loads(repair_json_output(full_response))
except json.JSONDecodeError:
# ... (处理错误) ...
return Command(goto="__end__") # 或 reporter
if curr_plan.get("has_enough_context"): # 根据计划内容决定下一步
new_plan = Plan.model_validate(curr_plan)
return Command(
update={
"messages": [AIMessage(content=full_response, name="planner")],
"current_plan": new_plan,
},
goto="reporter", # 如果上下文充足,直接去报告
)
return Command(
update={
"messages": [AIMessage(content=full_response, name="planner")],
"current_plan": full_response,
},
goto="human_feedback", # 否则,需要人工反馈
)
这个片段展示了计划器如何根据迭代次数、LLM的输出(是否包含足够上下文)来更新状态(current_plan
)并决定下一个节点是 reporter
还是 human_feedback
。
2. research_team_node
(src/graph/nodes.py) - 任务分发
Python
def research_team_node(
state: State,
) -> Command[Literal["planner", "researcher", "coder"]]:
logger.info("Research team is collaborating on tasks.")
current_plan = state.get("current_plan")
if not current_plan or not current_plan.steps: # 如果没有计划或步骤,返回planner
return Command(goto="planner")
if all(step.execution_res for step in current_plan.steps): # 如果所有步骤都已执行,返回planner(可能进行下一轮计划或总结)
return Command(goto="planner")
# 找到第一个未执行的步骤
for step in current_plan.steps:
if not step.execution_res:
break
if step.step_type and step.step_type == StepType.RESEARCH: # 根据步骤类型决定调用哪个Agent
return Command(goto="researcher")
if step.step_type and step.step_type == StepType.PROCESSING:
return Command(goto="coder")
return Command(goto="planner") # 默认或未知类型返回planner
research_team_node
扮演了一个调度者的角色。它检查当前计划 current_plan
的执行状态,如果所有步骤都完成了,或者计划有问题,它会把控制权交还给 planner_node
。否则,它会找到第一个未完成的步骤,并根据该步骤的类型(StepType.RESEARCH
或 StepType.PROCESSING
)来决定是将任务分配给 researcher_node
还是 coder_node
。
3. _execute_agent_step
辅助函数 (src/graph/nodes.py) - 执行单个研究步骤并更新观察结果
Python
async def _execute_agent_step(
state: State, agent, agent_name: str
) -> Command[Literal["research_team"]]:
# ... (找到当前未执行的步骤 current_step) ...
# ... (准备 agent_input, 可能包含已完成步骤的信息) ...
result = await agent.ainvoke(input=agent_input) # 异步调用具体的Agent (research_agent 或 coder_agent)
response_content = result["messages"][-1].content
current_step.execution_res = response_content # 将Agent的输出存到步骤的执行结果中
return Command(
update={
"messages": [HumanMessage(content=response_content, name=agent_name)],
"observations": state.get("observations", []) + [response_content], # 将结果追加到总的观察列表中
},
goto="research_team", # 返回 research_team 进行下一轮调度
)
这个函数(被 researcher_node
和 coder_node
调用)清晰地展示了单个研究步骤是如何被执行的:
Agent(如
research_agent
)被调用来处理当前步骤。Agent的输出(
response_content
)被记录为该步骤的执行结果 (current_step.execution_res
)。同时,这个输出也被添加到一个全局的
observations
列表中,这个列表会累积所有步骤的发现,最终用于生成报告。状态更新后,流程返回到
research_team_node
,由它来决定下一个动作。
小结:Deer Flow 如何巧妙利用 LangGraph 实现复杂的研究流程
通过上述分析,我们可以看到 Deer Flow 非常巧妙地利用了 LangGraph 的核心特性:
清晰的状态定义 (
State
):将复杂研究流程中需要跟踪的所有信息(用户请求、计划、中间发现、迭代次数等)都封装在统一的State
对象中。模块化的节点 (
nodes.py
):将不同的功能单元(如协调、计划、研究、编码、报告、人工反馈)实现为独立的节点,每个节点职责单一,易于维护和扩展。灵活的图构建 (
builder.py
):通过定义节点间的固定边和条件边,构建出能够反映复杂研究逻辑(包括迭代、分支、人工介入)的工作流图。自动化的流程驱动 (
workflow.py
):一旦图构建完成,LangGraph 会自动根据状态和图的定义来驱动整个流程的执行。
这种基于图和状态的架构,使得 Deer Flow 能够以一种结构化、可控的方式来编排多个Agent和工具,从而完成复杂的研究任务,并能方便地进行迭代优化和功能扩展。
关注我,不迷路!也欢迎大家关注我的公众号《编程挺好玩》,交流讨论更方便~