今天我们来深入了解 Deer Flow 的核心架构,特别是它如何利用 LangGraph 进行状态管理,让我们一步步解析 Deer Flow 的精妙之处。

LangGraph 简介:为什么选择 LangGraph?

Deer Flow 选择 LangGraph 作为其核心流程控制框架,主要是看中了 LangGraph 在构建复杂代理(Agent)系统方面的强大能力。我们可以从以下几个方面理解其优势:

  • Stateful (状态管理):LangGraph 的核心设计就是围绕状态(State)进行的。在多步骤、多Agent协作的复杂流程中,能够清晰地定义、传递和更新状态至关重要。Deer Flow 正是利用这一点来管理其研究(Research)过程中的各种信息和中间结果。

  • Multi-actor (多参与者):现代AI应用,尤其是像 Deer Flow 这样的研究型AI助手,往往需要多个具有不同角色的Agent协同工作(例如计划、执行研究、编码、报告等)。LangGraph 提供了构建多Agent图(Graph)的能力,使得每个Agent可以作为图中的一个节点(Node),并定义它们之间的协作流程。

  • Cycles (循环与迭代):研究过程往往不是线性的,可能需要多次迭代、反思和修正。LangGraph 支持在图中构建循环(Cycles),这使得 Deer Flow 能够实现如“计划-执行-反馈-修正计划”这样的迭代式工作流,从而提升最终结果的质量。

Deer Flow 中的“状态”核心:State 详解

在 Deer Flow 中,整个研究流程的状态管理是通过一个核心的 State 类来实现的。这个类定义在 src/graph/types.py 文件中。

代码导览:src/graph/types.py - State 的定义

State 类继承自 LangGraph 的 MessagesState,并在此基础上扩展了一些 Deer Flow 特有的状态属性,用于在不同的Agent节点之间传递信息、控制流程。

Python

# Copyright (c) 2025 Bytedance Ltd. and/or its affiliates
# SPDX-License-Identifier: MIT

import operator
from typing import Annotated

from langgraph.graph import MessagesState

from src.prompts.planner_model import Plan


class State(MessagesState):
    """State for the agent system, extends MessagesState with next field."""

    # Runtime Variables
    locale: str = "en-US"
    observations: list[str] = []
    plan_iterations: int = 0
    current_plan: Plan | str = None
    final_report: str = ""
    auto_accepted_plan: bool = False
    enable_background_investigation: bool = True
    background_investigation_results: str = None

关键属性解读:

  • messages: 继承自 MessagesState,用于存储对话历史和Agent间的消息传递。

  • locale: 当前用户交互的语言环境,例如 "en-US"。

  • observations: 一个列表,用于收集各个研究步骤中产生的观察结果或发现。

  • plan_iterations: 记录计划(Plan)被修改或迭代的次数。

  • current_plan: 当前正在执行的研究计划,可以是结构化的 Plan 对象或其字符串表示。

  • final_report: 最终生成的研究报告。

  • auto_accepted_plan: 布尔值,指示计划是否被自动接受(例如,在不需要人工审核的情况下)。

  • enable_background_investigation: 布尔值,控制是否在制定计划前进行背景调查。

  • background_investigation_results: 存储背景调查的结果。

这个 State 对象就像一个“接力棒”,在图的各个节点间传递,每个节点根据当前状态执行任务,并可以更新状态,然后传递给下一个节点。

图的构建:src/workflow.pysrc/graph/builder.py

Deer Flow 的核心工作流程是通过 LangGraph 构建的一个状态图(StateGraph)来驱动的。

src/graph/builder.py 中的图构建逻辑

这个文件负责定义和构建整个Agent工作流图。

Python

# Copyright (c) 2025 Bytedance Ltd. and/or its affiliates
# SPDX-License-Identifier: MIT

from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver

from .types import State
from .nodes import (
    coordinator_node,
    planner_node,
    reporter_node,
    research_team_node,
    researcher_node,
    coder_node,
    human_feedback_node,
    background_investigation_node,
)


def _build_base_graph():
    """Build and return the base state graph with all nodes and edges."""
    builder = StateGraph(State)
    builder.add_edge(START, "coordinator")
    builder.add_node("coordinator", coordinator_node)
    builder.add_node("background_investigator", background_investigation_node)
    builder.add_node("planner", planner_node)
    builder.add_node("reporter", reporter_node)
    builder.add_node("research_team", research_team_node)
    builder.add_node("researcher", researcher_node)
    builder.add_node("coder", coder_node)
    builder.add_node("human_feedback", human_feedback_node)
    builder.add_edge("reporter", END)
    return builder


def build_graph_with_memory():
    """Build and return the agent workflow graph with memory."""
    # use persistent memory to save conversation history
    # TODO: be compatible with SQLite / PostgreSQL
    memory = MemorySaver()

    # build state graph
    builder = _build_base_graph()
    return builder.compile(checkpointer=memory)


def build_graph():
    """Build and return the agent workflow graph without memory."""
    # build state graph
    builder = _build_base_graph()
    return builder.compile()


graph = build_graph()

关键点:

  1. StateGraph(State): 创建一个状态图实例,并指定使用我们之前定义的 State 类作为图的状态模型。

  2. add_node(name, action): 将各个Agent(或其他逻辑处理单元)作为节点添加到图中。这些节点的具体实现在 src/graph/nodes.py 中。

  3. add_edge(start_node, end_node): 定义节点之间的直接连接。

  4. compile(): 将定义好的图结构编译成可执行的工作流。

src/workflow.py 中的工作流驱动

这个文件利用 src/graph/builder.py中构建好的图,来实际运行Agent工作流。

Python

# Copyright (c) 2025 Bytedance Ltd. and/or its affiliates
# SPDX-License-Identifier: MIT

import asyncio
import logging
from src.graph import build_graph # 导入构建图的函数

# ... (logging configuration) ...

logger = logging.getLogger(__name__)

# Create the graph
graph = build_graph() # 构建图实例


async def run_agent_workflow_async(
    user_input: str,
    # ... (other parameters) ...
):
    # ... (input validation and debug setup) ...
    logger.info(f"Starting async workflow with user input: {user_input}")
    initial_state = {
        "messages": [{"role": "user", "content": user_input}],
        "auto_accepted_plan": True,
        "enable_background_investigation": enable_background_investigation,
    }
    config = {
        # ... (configuration for the run) ...
    }
    last_message_cnt = 0
    async for s in graph.astream( # 异步执行图
        input=initial_state, config=config, stream_mode="values"
    ):
        # ... (process and print stream output) ...
    logger.info("Async workflow completed successfully")

if __name__ == "__main__":
    print(graph.get_graph(xray=True).draw_mermaid()) # 可以打印出图的 Mermaid 定义,便于可视化

核心在于:

  1. graph = build_graph(): 获取编译好的工作流图。

  2. graph.astream(input=initial_state, ...): 使用初始的用户输入(包装在 initial_state 中)来异步启动并流式执行这个图。

Agent 如何成为图中的节点:src/graph/nodes.py 的角色

src/graph/nodes.py 文件是 Deer Flow 核心逻辑的汇集地,它定义了图中各个节点的具体行为。每个节点通常对应一个Agent或者一个特定的处理阶段。

例如,文件中包含以下节点(部分列举):

  • coordinator_node: 协调器节点,通常是流程的入口,负责初步理解用户意图并决定是否转交给计划器。

  • background_investigation_node: 背景调查节点,在正式计划前进行初步的信息搜集。

  • planner_node: 计划器节点,负责根据当前状态(可能包含用户输入、背景调查结果等)生成详细的研究计划。

  • human_feedback_node: 人工反馈节点,允许用户审查计划并提供反馈,实现人机协作。

  • research_team_node: 研究团队节点,这是一个调度节点,根据计划中未完成的步骤,决定接下来是调用研究员(researcher_node)还是编码员(coder_node)。

  • researcher_node: 研究员节点,执行具体的研究任务(例如网络搜索、文档阅读)。

  • coder_node: 编码员(或数据处理)节点,执行需要代码处理的任务(例如运行Python脚本分析数据)。

  • reporter_node: 报告员节点,根据收集到的所有信息和观察结果,撰写最终的研究报告。

每个这样的节点函数接收当前的 State 对象作为输入,执行其特定的逻辑(通常会调用大语言模型LLM,并可能使用各种工具Tool),然后返回一个 Command 对象或一个字典来更新状态,并可能指定下一个要跳转到的节点(这通常通过 builder.py 中的条件边定义)。

状态在图中如何流转与更新

状态的流转和更新是 LangGraph 的核心机制,在 Deer Flow 中体现得淋漓尽致:

  1. 初始状态:工作流从 workflow.py 中定义的 initial_state 开始,通常包含用户的初始请求。

  2. 节点执行:LangGraph 根据图的定义(边的连接和条件)将当前状态传递给相应的节点函数(定义在 nodes.py 中)。

  3. 状态读取与任务执行:节点函数读取传入 State 中的信息,执行其任务。例如,planner_node 会读取 state["messages"]state["background_investigation_results"] 来制定计划。

  4. 状态更新:节点函数执行完毕后,会返回一个字典,这个字典的键值对会被用来更新 State 对象。

    例如,planner_node 在生成计划后,可能会返回:

    Python

    return Command(
        update={
            "messages": [AIMessage(content=full_response, name="planner")],
            "current_plan": new_plan, # 更新当前计划
        },
        goto="reporter", # 或者 "human_feedback"
    )
    

    这里的 update 字典的内容会合并到当前的 State 中。

  5. 条件流转:在 builder.py 中定义的条件边(Conditional Edges)会检查更新后的 State,根据预设的逻辑(通常是一个返回节点名的函数)决定下一个激活哪个节点。例如,research_team_node 会检查 current_plan 中的步骤完成情况,来决定是再次调用 planner,还是调用 researchercoder

  6. 循环与结束:通过这种状态更新和条件流转,流程可以在不同的节点间跳转,形成循环(例如,计划调整后重新执行),直到达到某个结束条件(例如,报告生成完毕并流向 END 节点)。

关键代码片段赏析

让我们看几个关键片段来加深理解:

1. planner_node (src/graph/nodes.py) - 计划生成与迭代控制

Python

def planner_node(
    state: State, config: RunnableConfig
) -> Command[Literal["human_feedback", "reporter"]]:
    # ...
    plan_iterations = state["plan_iterations"] if state.get("plan_iterations", 0) else 0
    # ...
    if plan_iterations >= configurable.max_plan_iterations: # 控制最大迭代次数
        return Command(goto="reporter")
    # ... (调用LLM生成计划) ...
    try:
        curr_plan = json.loads(repair_json_output(full_response))
    except json.JSONDecodeError:
        # ... (处理错误) ...
        return Command(goto="__end__") # 或 reporter
    
    if curr_plan.get("has_enough_context"): # 根据计划内容决定下一步
        new_plan = Plan.model_validate(curr_plan)
        return Command(
            update={
                "messages": [AIMessage(content=full_response, name="planner")],
                "current_plan": new_plan,
            },
            goto="reporter", # 如果上下文充足,直接去报告
        )
    return Command(
        update={
            "messages": [AIMessage(content=full_response, name="planner")],
            "current_plan": full_response,
        },
        goto="human_feedback", # 否则,需要人工反馈
    )

这个片段展示了计划器如何根据迭代次数、LLM的输出(是否包含足够上下文)来更新状态(current_plan)并决定下一个节点是 reporter 还是 human_feedback

2. research_team_node (src/graph/nodes.py) - 任务分发

Python

def research_team_node(
    state: State,
) -> Command[Literal["planner", "researcher", "coder"]]:
    logger.info("Research team is collaborating on tasks.")
    current_plan = state.get("current_plan")
    if not current_plan or not current_plan.steps: # 如果没有计划或步骤,返回planner
        return Command(goto="planner")
    if all(step.execution_res for step in current_plan.steps): # 如果所有步骤都已执行,返回planner(可能进行下一轮计划或总结)
        return Command(goto="planner")
    
    # 找到第一个未执行的步骤
    for step in current_plan.steps:
        if not step.execution_res:
            break
            
    if step.step_type and step.step_type == StepType.RESEARCH: # 根据步骤类型决定调用哪个Agent
        return Command(goto="researcher")
    if step.step_type and step.step_type == StepType.PROCESSING:
        return Command(goto="coder")
        
    return Command(goto="planner") # 默认或未知类型返回planner

research_team_node 扮演了一个调度者的角色。它检查当前计划 current_plan 的执行状态,如果所有步骤都完成了,或者计划有问题,它会把控制权交还给 planner_node。否则,它会找到第一个未完成的步骤,并根据该步骤的类型(StepType.RESEARCHStepType.PROCESSING)来决定是将任务分配给 researcher_node 还是 coder_node

3. _execute_agent_step 辅助函数 (src/graph/nodes.py) - 执行单个研究步骤并更新观察结果

Python

async def _execute_agent_step(
    state: State, agent, agent_name: str
) -> Command[Literal["research_team"]]:
    # ... (找到当前未执行的步骤 current_step) ...

    # ... (准备 agent_input, 可能包含已完成步骤的信息) ...
    
    result = await agent.ainvoke(input=agent_input) # 异步调用具体的Agent (research_agent 或 coder_agent)
    response_content = result["messages"][-1].content
    
    current_step.execution_res = response_content # 将Agent的输出存到步骤的执行结果中
    
    return Command(
        update={
            "messages": [HumanMessage(content=response_content, name=agent_name)],
            "observations": state.get("observations", []) + [response_content], # 将结果追加到总的观察列表中
        },
        goto="research_team", # 返回 research_team 进行下一轮调度
    )

这个函数(被 researcher_nodecoder_node 调用)清晰地展示了单个研究步骤是如何被执行的:

  1. Agent(如 research_agent)被调用来处理当前步骤。

  2. Agent的输出(response_content)被记录为该步骤的执行结果 (current_step.execution_res)。

  3. 同时,这个输出也被添加到一个全局的 observations 列表中,这个列表会累积所有步骤的发现,最终用于生成报告。

  4. 状态更新后,流程返回到 research_team_node,由它来决定下一个动作。

小结:Deer Flow 如何巧妙利用 LangGraph 实现复杂的研究流程

通过上述分析,我们可以看到 Deer Flow 非常巧妙地利用了 LangGraph 的核心特性:

  1. 清晰的状态定义 (State):将复杂研究流程中需要跟踪的所有信息(用户请求、计划、中间发现、迭代次数等)都封装在统一的 State 对象中。

  2. 模块化的节点 (nodes.py):将不同的功能单元(如协调、计划、研究、编码、报告、人工反馈)实现为独立的节点,每个节点职责单一,易于维护和扩展。

  3. 灵活的图构建 (builder.py):通过定义节点间的固定边和条件边,构建出能够反映复杂研究逻辑(包括迭代、分支、人工介入)的工作流图。

  4. 自动化的流程驱动 (workflow.py):一旦图构建完成,LangGraph 会自动根据状态和图的定义来驱动整个流程的执行。

这种基于图和状态的架构,使得 Deer Flow 能够以一种结构化、可控的方式来编排多个Agent和工具,从而完成复杂的研究任务,并能方便地进行迭代优化和功能扩展。

关注我,不迷路!也欢迎大家关注我的公众号《编程挺好玩》,交流讨论更方便~

Snipaste_2024-12-07_11-05-00.png