Skip to main content

第 5 课:Reflexion 架构(带反思的 Agent)

工程知识:

  • 4.1 Reflexion Prompt 的结构
  • 4.2 Agent 自我纠错(Self-Feedback)
  • 4.3 Agent 如何“学习失败经验”
  • 4.4 将反思写入记忆(Reflection → Memory)
  • 4.5 Reflexion + ReAct 的集成架构

工程视角一句话: Reflexion = 在失败/低质量时,把“错误原因 + 改进策略”结构化记录下来,并在下一次决策时注入,让 Agent 真的变得更稳。


4.1 Reflexion Prompt 的结构

Reflexion 的 prompt 不是“请反思一下”,而是必须结构化,否则反思会变成空话。

推荐的工程结构(可直接用):

A) Reflection(反思产物)应包含 5 个字段

  1. Failure Signal(触发原因)
    • validator failed / tool error / loop / wrong answer / constraint violation / user dissatisfaction
  2. Root Cause(根因)
    • prompt ambiguous / schema mismatch / missing observation / bad plan / tool misuse
  3. Fix Strategy(修复策略)
    • 改 prompt / 改工具 schema / 增加检查 / 更换工具 / 分解任务
  4. Patch(可执行补丁)
    • 下一轮要加的系统提示/规则/步骤(必须是可执行文本)
  5. Memory Write(写入记忆的内容)
    • 简短、可检索、可复用(不要写长篇作文)

B) Reflexion 输出格式(建议 JSON)

{
  "failure_signal": "...",
  "root_cause": "...",
  "fix_strategy": ["...", "..."],
  "patch_prompt": "...",
  "memory_to_write": [
    {"type": "rule", "text": "..."},
    {"type": "pattern", "text": "..."}
  ]
}

4.2 Agent 自我纠错(Self-Feedback)

Self-Feedback 的关键不是“模型自己说自己错了”,而是:

你要提供可验证的失败证据(error evidence),让模型对着证据改。

常见失败证据来源:

  • JSON parse error
  • schema validator error
  • tool error(超时/429/参数缺失)
  • loop 检测(重复 tool_call / 重复 query)
  • 任务验收失败(比如你有 checker:答案不满足约束)

工程上 Self-Feedback 发生在两个时机:

  1. 同一轮内纠错(你之前 decide_with_retry 已做)
  2. 跨轮纠错:当连续失败或验收失败 → 触发 Reflexion,生成“补丁提示”,注入下一轮(本课重点)

4.3 Agent 如何“学习失败经验”

“学习”不是训练参数,而是学习策略(policy learning 的工程弱化版):

  • 把每次失败转成: (context, failure, patch)
  • 下一次遇到类似 context: 优先注入 patch

你可以把它当成:

轻量版的“经验回放 + 规则补丁系统”

工程上最常见的两类“可复用经验”:

  1. 规则类经验(Rule Memory)
    • “当 action=tool_call 时 final 必须为 null”
    • “任何外部事实必须来自 Observation”
  2. 模式类经验(Pattern Memory)
    • “日程问题先枚举候选再逐一排冲突”
    • “解析边界条件:after=严格大于”

4.4 将反思写入记忆(Reflection → Memory)

这里你不需要马上上向量库;最小可用版本:

  • memory_rules: List[str]
  • memory_patterns: List[str]

写入策略(工程建议):

  • 只写“能复用、可执行、短句”
  • 不写“情绪化总结”
  • 不写“任务细节复述”
  • 对每条 memory 加一个 tagstopic(未来检索)

4.5 Reflexion + ReAct 的集成架构

你已经有一个 loop:

decide → tool → observation → decide → ...

Reflexion 的加入是一个旁路

decide_with_retry 失败 / checker 不通过 / loop 检测触发

reflect(error_evidence, context)

write memory (rules/patterns)

patch injected into next decide

也就是说: Reflexion 不替代 ReAct,而是让 ReAct 在失败后“升级策略”


示例代码:Reflexion + ReAct(可直接跑)

下面这份代码基于你之前的 “decision JSON + tool_call + observation” 框架,新增:

  • reflection_llm():生成反思 JSON
  • MemoryStore:存规则/模式记忆
  • AgentLoop:当失败触发 reflexion,把 patch 注入下一次决策

我仍用假 search 工具,保证稳定复现。

import os, json, time, random, requests
from dataclasses import dataclass, field
from typing import Any, Dict, Optional, Tuple, List

API_KEY = os.getenv("COMET_API_KEY")
BASE_URL = "https://api.cometapi.com"
CHAT_URL = f"{BASE_URL}/v1/chat/completions"
if not API_KEY:
    raise RuntimeError("COMET_API_KEY not set")

# -------------------------
# Tools
# -------------------------
def tool_search(query: str) -> str:
    # 故意做得“有时会失败”,用来触发 Reflexion
    if "FAIL" in query:
        raise RuntimeError("Simulated tool failure")
    return f"[SEARCH_RESULT] query={query}\n- 'after 09:00' usually means strictly later than 09:00."

TOOLS = {"search": tool_search}
TOOL_SCHEMAS = {
    "search": {
        "parameters": {
            "type": "object",
            "properties": {"query": {"type": "string"}},
            "required": ["query"],
            "additionalProperties": False
        }
    }
}

ALLOWED_ACTIONS = {"tool_call", "finish", "replan", "ask_user"}

# -------------------------
# Memory Store
# -------------------------
@dataclass
class MemoryStore:
    rules: List[str] = field(default_factory=list)
    patterns: List[str] = field(default_factory=list)

    def inject_text(self) -> str:
        # 注入到 prompt 的“可执行记忆”
        chunks = []
        if self.rules:
            chunks.append("Rules learned:\n- " + "\n- ".join(self.rules))
        if self.patterns:
            chunks.append("Patterns learned:\n- " + "\n- ".join(self.patterns))
        return "\n\n".join(chunks).strip()

    def write(self, items: List[Dict[str, str]]):
        for it in items:
            t = it.get("type")
            text = it.get("text", "").strip()
            if not text:
                continue
            if t == "rule" and text not in self.rules:
                self.rules.append(text)
            if t == "pattern" and text not in self.patterns:
                self.patterns.append(text)

# -------------------------
# LLM Call
# -------------------------
def call_llm(messages, model="gpt-4o", max_tokens=400, temperature=0.2) -> str:
    headers = {"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}
    payload = {"model": model, "messages": messages, "max_tokens": max_tokens, "temperature": temperature}
    r = requests.post(CHAT_URL, headers=headers, json=payload, timeout=30)
    r.raise_for_status()
    return r.json()["choices"][0]["message"]["content"]

# -------------------------
# Decision Prompt (ReAct + CoT)
# -------------------------
DECISION_SYSTEM = f"""
You are an agent decision engine.

Output exactly ONE JSON object, no extra text.

Allowed actions: {sorted(ALLOWED_ACTIONS)}.
Allowed tools: {sorted(TOOLS.keys())}.

Decision schema:
{{
  "thought": "string",
  "action": "tool_call|finish|replan|ask_user",
  "tool": "string|null",
  "tool_input": "object|null",
  "final": "string|null"
}}

Rules:
- "thought" cannot include external facts. External facts MUST come from Observation.
- If action="tool_call": final must be null.
- If action="finish": tool/tool_input must be null.
Tool schema:
{json.dumps(TOOL_SCHEMAS, ensure_ascii=False)}
""".strip()

def validate_decision(obj: Dict[str, Any], tools_available: set) -> Tuple[bool, str]:
    for k in ("thought","action","tool","tool_input","final"):
        if k not in obj:
            return False, f"Missing key: {k}"
    if obj["action"] not in ALLOWED_ACTIONS:
        return False, f"Invalid action: {obj['action']}"

    if obj["action"] == "tool_call":
        if not isinstance(obj["tool"], str) or obj["tool"] not in tools_available:
            return False, "Invalid tool"
        if not isinstance(obj["tool_input"], dict):
            return False, "tool_input must be object"
        # required query
        if "query" not in obj["tool_input"] or not isinstance(obj["tool_input"]["query"], str):
            return False, "tool_input.query required and must be string"
        if obj["final"] is not None:
            return False, "final must be null when tool_call"
    else:
        if obj["tool"] is not None or obj["tool_input"] is not None:
            return False, "tool/tool_input must be null when not tool_call"
        if not isinstance(obj["final"], str):
            return False, "final must be string when not tool_call"
    return True, "ok"

def decide(state: Dict[str, Any], memory: MemoryStore, observation: Optional[str], err_hint: Optional[str]=None) -> Dict[str, Any]:
    msgs = [{"role":"system","content":DECISION_SYSTEM}]
    mem_text = memory.inject_text()
    if mem_text:
        msgs.append({"role":"system","content":f"Memory:\n{mem_text}"})

    msgs.append({"role":"user","content":json.dumps({"state": state}, ensure_ascii=False)})

    if observation:
        msgs.append({"role":"system","content":f"Observation:\n{observation}"})

    if err_hint:
        msgs.append({"role":"system","content":f"Your previous attempt failed: {err_hint}. Fix it and output valid JSON."})

    raw = call_llm(msgs, temperature=0.2, max_tokens=350)
    return json.loads(raw)

# -------------------------
# Reflexion Prompt
# -------------------------
REFLEX_SYSTEM = """
You are a reflexion module for an agent.

Given:
- the goal/state
- the last decision JSON
- the error evidence (validator/tool/loop/checker failure)
Produce a JSON reflection that contains:
{
  "failure_signal": "...",
  "root_cause": "...",
  "fix_strategy": ["...", "..."],
  "patch_prompt": "A short patch to be injected into the decision prompt",
  "memory_to_write": [{"type":"rule|pattern","text":"..."}]
}

Constraints:
- Make patch_prompt directly executable (imperative rules).
- memory_to_write must be short and reusable.
Output exactly one JSON object, no extra text.
""".strip()

def reflect(state: Dict[str, Any], last_decision: Dict[str, Any], error_evidence: str) -> Dict[str, Any]:
    msgs = [
        {"role":"system","content":REFLEX_SYSTEM},
        {"role":"user","content":json.dumps({
            "state": state,
            "last_decision": last_decision,
            "error_evidence": error_evidence
        }, ensure_ascii=False)}
    ]
    raw = call_llm(msgs, temperature=0.2, max_tokens=350)
    return json.loads(raw)

# -------------------------
# Agent Loop with Reflexion
# -------------------------
def run_reflexion_agent(goal: str, max_steps: int = 6, max_reflections: int = 2) -> str:
    memory = MemoryStore()
    state = {"goal": goal}
    tools_available = set(TOOLS.keys())
    observation = None

    reflection_count = 0
    last_decision = None

    for step in range(max_steps):
        err_hint = None

        # 决策:若上轮有错误补丁,可作为 state note 注入
        decision = None
        try:
            decision = decide(state, memory, observation, err_hint=None)
        except Exception as e:
            # 解析 JSON 就失败:直接当作 error evidence
            err = f"Decision JSON parse error: {type(e).__name__}"
            last_decision = {"raw_error": err}
            if reflection_count < max_reflections:
                refl = reflect(state, last_decision, err)
                memory.write(refl.get("memory_to_write", []))
                state["patch"] = refl.get("patch_prompt", "")
                reflection_count += 1
                observation = None
                continue
            return f"Failed: {err}"

        ok, reason = validate_decision(decision, tools_available)
        if not ok:
            # validator 失败 → Reflexion
            last_decision = decision
            if reflection_count < max_reflections:
                refl = reflect(state, last_decision, f"Validator failure: {reason}")
                memory.write(refl.get("memory_to_write", []))
                state["patch"] = refl.get("patch_prompt", "")
                reflection_count += 1
                observation = None
                continue
            return f"Failed: validator repeatedly failed: {reason}"

        # 如果有 patch,注入到 state 的 note(最小实现)
        if state.get("patch"):
            state["note"] = state["patch"]

        # 执行动作
        if decision["action"] == "finish":
            return decision["final"]

        if decision["action"] == "ask_user":
            return decision["final"]

        if decision["action"] == "replan":
            state["note"] = "Replan requested. Consider tools or ask_user."
            observation = None
            continue

        if decision["action"] == "tool_call":
            tool = decision["tool"]
            tool_input = decision["tool_input"]
            try:
                observation = TOOLS[tool](**tool_input)
            except Exception as e:
                # tool 失败 → Reflexion(把失败作为 evidence)
                last_decision = decision
                err = f"Tool error: {type(e).__name__}: {e}"
                if reflection_count < max_reflections:
                    refl = reflect(state, last_decision, err)
                    memory.write(refl.get("memory_to_write", []))
                    state["patch"] = refl.get("patch_prompt", "")
                    reflection_count += 1
                    observation = None
                    continue
                return f"Failed: {err}"

    return "Failed: exceeded max_steps"

if __name__ == "__main__":
    print(run_reflexion_agent(
        goal="Solve this scheduling problem: must start on the hour, strictly after 09:00, end by 14:00; events: 09:00-09:30, 10:00-11:00, 12:00-12:30, 13:00-14:00. List possible start times and earliest."
    ))

工程落地建议(你之后做 MemeMiner 或 TreeChat 这种系统时必须用)

1) 什么情况下触发 Reflexion?

建议只在“明确失败信号”触发:

  • validator 失败次数超过阈值
  • tool error
  • loop 检测(重复 2 次以上)
  • checker 不通过(你自己写验收器)

不要每轮都反思——成本高、也容易过拟合。

2) 反思内容如何写入长期记忆?

下一步你会做:

  • 反思写入向量库(FAISS / Milvus)
  • 写入时带 tags:{"topic":"time_constraints", "pattern":"enumerate candidates then filter"}
  • 检索时按“相似任务”把 patch 注入

你下一步最应该学的扩展点(我建议)

为了让 Reflexion 真正“像学习”,你还差两个组件:

  1. Checker(验收器):自动判断答案是否满足约束
  2. Memory Retrieval(检索记忆):按任务相似度注入相关反思

如果你同意,下一课我就把 Reflexion 升级成:

Reflexion + Checker + Memory Retrieval 让 Agent 能在“错误答案也能反思并改正”,而不仅是格式错误。