Skip to main content

第 6 课:Memory(记忆系统)

(一)三种记忆类型

这是架构层面的区分,不是“概念分类”。

1. 短期记忆(Short-term Memory / Context)

我们在本节课之前接触的所有内容都是短期记忆

短期记忆就是指当前对话上下文,一般直接明文写入 prompt 的 message 部分,每轮对话用 append 方式更新。

一般包含:对话要求,当前任务的中间结果,最近的 Observation

由于受到 token 使用量的限制,最大容量极小,而且无法方便地管理。因此也不能长期存储知识,不能当数据库,也不能当学习机制。

messages = [
    {"role": "system", "content": SYSTEM_PROMPT},
    {"role": "user", "content": json.dumps({"state": state, "tools_available": sorted(tools_available)}, ensure_ascii=False)},
    ]

2. 工作记忆(Working Memory / State)

工作记忆一般主要存储当前步骤的目标,补充的注意事项,已经完成的内容等。一般用变量 state 表示。

state = {
    "goal": "Schedule a meeting",
    "constraints": {...},
    "steps_done": ["enumerate_slots"],
    "current_plan": "filter conflicts",
    "budget": {"steps_left": 3}
}

工作记忆 State 由代码逻辑直接控制和维护,而不是由 LLM 生成。

它以 json/dict 的结构化格式保存在内存中,而不是 自然语言格式。因此可以用硬编码的逻辑直接管理检索。

danger

重点区分程序记忆模型记忆 的区别

  • 程序记忆:维持在本地内存中,以变量的形式存在,哪怕没有被作为 prompt 的一部分上传给模型,只要这个变量没有被消除,程序就可以调用它,这个记忆就始终“可用”
  • 模型记忆:也是在本地内存中,但是区别在于只要这段内容没有被作为 prompt 上传到模型侧,模型就“不记得”

工作记忆 State 既可以供程序直接使用,也可以转化成 str 后插入 prompt,成为 短期记忆的一部分。(例如上面的例子)

也可以不输入到 短期记忆中,例如:

state = {
    "goal": "Schedule meeting",
    "steps_done": ["enumerate_slots"]
}

messages = [
    {"role": "system", "content": SYSTEM_PROMPT},
    {"role": "user", "content": "Continue."}
]

此时:

  • 程序 仍然知道 已经做过 enumerate_slots
  • 模型 不知道
  • 程序可以:
    • 强制下一步只能 filter_conflicts
    • 拒绝模型乱来

👉 State 还活着,短期记忆已经失忆


而反过来

messages = [
    {"role": "user", "content": '{"steps_done": ["enumerate_slots"]}'}
]
state = {}  # 程序没记录

此时:

  • 模型“以为”自己已经枚举过
  • 程序无法验证
  • 这一步一旦错了,你无从发现

👉 这是“幻觉最容易发生的地方”


因此,工作记忆一般被用于:

  • Planner / Executor 的状态流
  • 防止模型“忘记自己在干什么”
  • 限制行动空间(guardrails)

👉 State 是“程序的记忆”,不是“模型的记忆”


3. 长期记忆(Long-term Memory)

长期记忆则不再以 str 和 json 形式,保存在单个对话的临时变量内存中,而是保存在数据库中,可以跨对话、跨任务,可以检索(embedding + similarity)和复用。

通常是:

  • 向量数据库(semantic memory)
  • 或结构化规则库(symbolic memory)

4. 三种记忆的关系

长期记忆 ──检索──▶ 短期记忆(prompt)
▲ │
│ ▼
写入◀──总结/反思── 工作记忆(state)


(二)长期记忆的实现

你要做“长期记忆”,本质上是要让 Agent 具备能力:

把过去学过的东西(知识/规则/经验)存起来,并在未来遇到类似问题时快速找回来。

而“向量数据库检索”这条路的核心流程是:

  1. 把文本切成一段段(chunk)
  2. 把每段文本变成一个向量(embedding)
  3. 用户来一个新问题 → 把问题也变成向量
  4. 用向量相似度找最相关的几个 chunk(Top-K)
  5. 把这些 chunk 塞回 prompt(短期记忆)给模型用

这个流程就是所谓的 RAG


1. 什么是 chunk?

chunk = 你存进长期记忆的一条“最小可检索片段”

你可以把长期记忆当成一本书的索引系统:

  • 如果你把整本书当成一个条目:搜索很难命中正确位置(太粗)
  • 如果你把每个字当成条目:条目太碎,检索噪声很大(太细)

所以要切成合适长度的“段落级单位”——这就是 chunk。

chunk 通常包含:

  • text: 文本内容(你要记住的那段话)
  • type: 记忆类别(knowledge / rule / reflection / event…)
  • tags: 标注(用于过滤、路由、多通道记忆)
  • 还可以加:sourcetimestampconfidenceid

例子:

chunk = {
  "text": "When scheduling problems mention 'after X', interpret it as strictly later than X.",
  "type": "rule",
  "tags": ["time_constraint", "boundary_condition"]
}

意思是:把“时间边界解释规则”作为一条可检索规则记忆存进去,未来遇到“after 9:00”这种问题可以检索出来。

不同“记忆类型”天然粒度不同:

类型为什么这样切
知识(200–500 tokens)需要上下文完整,段落级最合适
对话总结(100–200 tokens)总结本来就短,太长会浪费 token
规则(1条=1chunk)规则要精确可复用,不要混在一起
反思(1失败=1chunk)每次失败的经验是独立样本,方便检索与去重


那么如何 用代码 按照语意 来 把长文切成 chunk 呢?

  • 规则/反思:结构化写入,写一条规则就是一个 chunk。

  • 知识文档:按 token / 段落切

    • 字符近似:仅适合本地测试,字符数 ≈ token 数 × 3~4,每个 chunk 控制在一个大概范围

      from typing import List
      
      def chunk_by_chars(
          text: str,
          max_chars: int = 1200,
          overlap: int = 150
      ) -> List[str]:
      
          chunks = []
          start = 0           # 切分的起始位置
          text = text.strip() # 清除首尾/n
      
          while start < len(text):
              end = min(len(text), start + max_chars)
              chunk = text[start:end].strip()
      
              if chunk:
                  chunks.append(chunk)
      
              # 下一个 chunk 起点(带 overlap)
              start = end - overlap
              if start < 0:
                  start = 0
      
              if end == len(text):
                  break
      
          return chunks
      
      
      if __name__ == "__main__":
          doc = (
              "Agents are systems that combine a language model with tools and memory. "
              "They can plan, act, observe results, and iterate. "
              "Long-term memory allows agents to store experiences and retrieve them later. "
              * 20
          )
          print(type(doc))
          chunks = chunk_by_chars(doc, max_chars=500, overlap=80)
      
          for i, c in enumerate(chunks):
              print(f"\n--- Chunk {i} (len={len(c)}) ---\n{c[:120]}...")
      
      --- Chunk 0 (len=500) ---
      Agents are systems that combine a language model with tools and memory. They can plan, act, observe results, and iterate...

      --- Chunk 1 (len=500) ---
      t combine a language model with tools and memory. They can plan, act, observe results, and iterate. Long-term memory all...

      --- Chunk 2 (len=499) ---
      odel with tools and memory. They can plan, act, observe results, and iterate. Long-term memory allows agents to store ex...

      --- Chunk 3 (len=500) ---
      mory. They can plan, act, observe results, and iterate. Long-term memory allows agents to store experiences and retrieve...

      这段代码每 500 个字符作为一个 chunk,每个chunk 之间有 80 个字符的 重叠部分

    • tokenizer:需要 tokenizer(或 API)

      常见 tokenizer 选择

      tokenizer适用模型
      tiktokenOpenAI / GPT-4 / GPT-4o
      HuggingFace tokenizerLLaMA / Qwen / DeepSeek

      这里采用 tiktoken

      pip install tiktoken

      下面是这段代码的逻辑:

      1. 把原始文本转化成 token 列表

      2. 每 200 个 token 作为一个 chunk,每个chunk 之间有 40 个token 的 重叠部分

      3. 把每个 chunk 对应的 token 列表重新转化成 文本

      import tiktoken
      from typing import List
      
      def chunk_by_tokens(
          text: str,
          max_tokens: int = 300,
          overlap: int = 50,
          model_name: str = "gpt-4o"
      ) -> List[str]:
          """
          使用 tiktoken 按 token 数精确切分
          """
          enc = tiktoken.encoding_for_model(model_name)
          print(enc)
          tokens = enc.encode(text)  # 把文本改写成 token 列表,如 [112240, 553, 7511, 484 ……
      
          chunks = []
          start = 0
      
          while start < len(tokens):
              end = min(len(tokens), start + max_tokens)
              token_chunk = tokens[start:end]
      
              chunk_text = enc.decode(token_chunk).strip() # 把 token 列表重新转化成 文本
              if chunk_text:
                  chunks.append(chunk_text)
      
              start = end - overlap
              if start < 0:
                  start = 0
      
              if end == len(tokens):
                  break
      
          return chunks
      
      
      if __name__ == "__main__":
          doc = (
              "Agents are systems that combine a language model with tools and memory. "
              "They can plan, act, observe results, and iterate. "
              "Long-term memory allows agents to store experiences and retrieve them later. "
              * 20
          )
      
          chunks = chunk_by_tokens(
              doc,
              max_tokens=200,
              overlap=40,
              model_name="gpt-4o"
          )
      
          for i, c in enumerate(chunks):
              print(f"\n--- Chunk {i} ---")
              print(c[:120], "...")
      
      

而工程上你会把每个 chunk 包装成 MemoryChunk(type="knowledge") 写入向量库。


2. 什么是 embedding?

embedding = 把一段文本变成一个高维向量(例如 768 维 / 1536 维),让计算机可以用数学方式衡量“语义相似”。

类比一下:

  • 文本:人类理解
  • 向量:机器理解

向量空间里:语义相似的文本 → 向量距离更近 / 内积更大

所以检索不是靠关键词,而是靠“语义相似”。

那么如何把 一段chunk文本变成一个向量呢?

  • 用 embedding API:OpenAI / 其他服务提供 embedding 模型,质量高

    import requests
    BASE_URL = "https://api.cometapi.com/v1"   
    
    def embed_with_api(text: str, model: str = "text-embedding-3-large"):
        url = f"{BASE_URL}/embeddings"
        headers = {
            "Authorization": f"Bearer {API_KEY}",
            "Content-Type": "application/json"
        }
        payload = {
            "model": model,
            "input": text
        }
    
        resp = requests.post(url, headers=headers, json=payload, timeout=30)
        resp.raise_for_status()
        data = resp.json()
    
        # 这是一个 list[float]
        embedding = data["data"][0]["embedding"]
        return embedding
    
    
    if __name__ == "__main__":
        text = "When scheduling problems mention 'after X', interpret it as strictly later than X."
    
        vec = embed_with_api(text)
        print("Embedding dimension:", len(vec))
        print("First 8 numbers:", vec[:8])
    
    Embedding dimension: 3072
    First 8 numbers: [-0.0036590735, 0.011169595, -0.014485568, 0.033508785, 0.013652608, -0.0048391, 0.013295625, 0.046772677]
  • sentence-transformers:本地跑,方便调试,质量取决于模型

    pip install sentence-transformers numpy
    from sentence_transformers import SentenceTransformer
    import numpy as np
    
    # 常用、轻量、效果不错
    MODEL_NAME = "all-MiniLM-L6-v2"
    
    model = SentenceTransformer(MODEL_NAME)
    
    def embed_with_st(text: str):
        # 返回 numpy array
        vec = model.encode(text, normalize_embeddings=True)
        return vec
    
    
    if __name__ == "__main__":
        text = "When scheduling problems mention 'after X', interpret it as strictly later than X."
    
        vec = embed_with_st(text)
        print("Embedding shape:", vec.shape)
        print("First 8 numbers:", vec[:8])
    
    
    Embedding shape: (384,)
    First 8 numbers: [-0.02017134 0.0222491 0.02075322 0.02680894 0.01276185 -0.00740438
    0.00387796 -0.05653415]

    384 维(这个模型的固定维度)


思考:为什么两种方法得到的向量形状不同?

你可以理解为:

  • 384 维:压缩后的“摘要特征”
  • 3072 维:更接近“完整语义状态”

维度差异来自:模型架构 + 训练目标 + 工程定位

  • 3072 维:大模型 API embedding(高保真、对齐 LLM)
  • 384 维:轻量 sentence-transformers(高效、通用检索)all-MiniLM-L6-v2固定输出维度就是 384

注意:相似度计算必须要在 同一模型内部,embedding 向量只在“同一个向量空间”里比较才有意义

也就是说:

  • 384 维 vs 384 维 → OK
  • 3072 维 vs 3072 维 → OK
  • ❌ 384 vs 3072 → 毫无意义

3. 向量数据库

向量数据库解决的问题非常具体:

给你一堆文本(chunk),每段文本都有一个 embedding 向量。 你输入一个 query,也做 embedding。 向量数据库要做的是:快速找到与 query 向量最相似的 Top-K 向量,并返回它们对应的原文 chunk。

它本质是一个“语义检索引擎”,核心能力包括:

  1. 存储:保存向量(embedding)+ 元数据(text/type/tags/id/source…)
  2. 索引:构建近似最近邻索引(ANN),让检索从 O(N) 变成近似 O(log N) 或更快
  3. 检索(retrieve):Top-K nearest neighbors(按 cosine / inner product / L2)
  4. 过滤/路由(生产常用):按 type/tags/time 等过滤后再检索
  5. 持久化与扩展(Milvus 擅长):分布式、高可用、多租户

下面是一个例子,实现了:

  1. 对文本进行 token 化,然后切分为 N 个 chunk,再把 token 转回文本形态
  2. 对 文本形态的 chunk list 进行向量化,得到一个 (N × dim)形状的 numpy 矩阵(dim 为向量大小,由模型类型决定)
  3. 把 单个 chunk 按照 ((明文+详细信息),向量) 为单个条目,写入数据库
from dataclasses import dataclass, field
from typing import List, Dict, Any
import numpy as np
import tiktoken
from sentence_transformers import SentenceTransformer

# 按照 token 切分好 chunk,并重新转化为 str,输出结果是一个 List,内含多个 chunk 文本
def chunk_by_tokens(text: str, max_tokens: int = 60, overlap: int = 10, model_name: str = "gpt-4o") -> List[str]:
    enc = tiktoken.encoding_for_model(model_name)
    tokens = enc.encode(text)

    chunks = []
    start = 0
    while start < len(tokens):
        end = min(len(tokens), start + max_tokens)
        chunk_text = enc.decode(tokens[start:end]).strip()
        if chunk_text:
            chunks.append(chunk_text)

        start = end - overlap
        if start < 0:
            start = 0
        if end == len(tokens):
            break
    return chunks

# 定义向量化工具
class Embedder:
    def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
        self.model = SentenceTransformer(model_name)
        self.dim = self.model.get_sentence_embedding_dimension() # 获取向量化后的向量维度

    # 向量化工具,把文本转化为一个 numpy 高维向量
    def encode(self, texts: List[str]) -> np.ndarray:  
        vecs = self.model.encode(texts, normalize_embeddings=True)
        return np.asarray(vecs, dtype=np.float32)


# 定义写入数据库的一个条目的格式,每一个条目都要新建一个实例
@dataclass
class MemoryChunk:
    id: int
    text: str
    type: str = "knowledge"
    tags: List[str] = field(default_factory=list)
    meta: Dict[str, Any] = field(default_factory=dict) # 元数据 = 描述数据的来源


# 定义数据库结构
class MemoryStore:
    """
    最小长期记忆存储:
    - chunks: 记忆条目(文本+元数据)
    - vectors: 对应 embedding(与 chunks 按 index 对齐)
    """
    def __init__(self, dim: int):
        self.dim = dim
        self.chunks: List[MemoryChunk] = []
        self.vectors: List[np.ndarray] = []

    def add(self, chunk: MemoryChunk, vector: np.ndarray):
        assert vector.shape == (self.dim,)
        self.chunks.append(chunk)
        self.vectors.append(vector)


if __name__ == "__main__":
    doc = """
Agent systems combine an LLM with tools and memory to perform tasks.
Long-term memory stores chunks of validated knowledge, rules, and reflections.
Chunking splits long text into retrievable pieces. Embeddings convert text to vectors.
Vector search finds semantically similar chunks via cosine similarity.
Tool calling lets the model request structured function invocations.
Guardrails restrict action space and validate outputs to reduce hallucinations.
    """.strip()

    # 1) chunk
    chunk_texts = chunk_by_tokens(doc, max_tokens=25, overlap=5) # 是一个list,形如 [str,str,……]

    # 2) embed
    embedder = Embedder()
    vecs = embedder.encode(chunk_texts)  # 是一个 numpy 向量,维度为 (chunk 数 N, 单个 array 维度 dim)
    # 注:同一个模型,生成的 向量的维度是固定的,因为输出层的节点数是固定的

    # 3) store
    store = MemoryStore(dim=embedder.dim)
    for i, (t, v) in enumerate(zip(chunk_texts, vecs)):
        chunk = MemoryChunk(
            id=i,    # 第几个 chunk
            text=t,  # 该 chunk 的对应文本
            type="knowledge",  # 该 chunk 的 类型,分为:knowledge,rule,reflection,event,persona/user_profile
            tags=["agent", "memory"], # 该 chunk 的标签
            meta={   # 元数据 = 描述数据的来源
                "source": "demo_doc_v1",
                "created_at": "2025-12-27",
                "token_max": 25,
                "overlap": 5,
            }
        )
        store.add(chunk, v) # 把一个 chunk 和 对应的 向量存入

    print("Stored items:", len(store.chunks))
    print("Example item:", store.chunks[0])
    print("Example vector dim:", store.vectors[0].shape)
Stored items: 4
Example item: MemoryChunk(id=0, text='Agent systems combine an LLM with tools and memory to perform tasks.\nLong-term memory stores chunks of validated knowledge, rules,', type='knowledge', tags=['agent', 'memory'], meta={'source': 'demo_doc_v1', 'created_at': '2025-12-27', 'token_max': 25, 'overlap': 5})
Example vector dim: (384,)

该代码运行完成后,实例 store 内部的 chunks 和 vectors 属性各自维护一个 list,内部元素意义对应,分别是一个 chunk 的 MemoryChunk 实例 和 向量化结果。

思考:为什么要 class MemoryChunk?直接存字符串不行吗?

MemoryChunk 中保存更多的背景信息,包括创建条件,来源,类型,版本,时间等,和对应的 embedding 对齐的 主键 ID


在生产环境下,我们一般不会自己手写 MemoryStore 这个数据库格式,而是使用常用的数据库包括:

DB适合
FAISS本地 / 单机 / 研究
Milvus分布式 / 生产
Redis中小规模 + 高速
Chroma快速原型

3.1 FAISS

FAISS(Facebook AI Similarity Search)严格说是一个向量索引库,可以本地高性能 ANN 检索,但是 不负责真正的数据库能力(权限、多租户、分布式、强持久化、复杂过滤等),适合先进行本地验证。

下面是一个代码例子

pip install faiss-cpu sentence-transformers tiktoken numpy

FAISS 的 index.add(vectors) 本质就是“写入向量”。你可以理解为:

  • list 版:你自己管理向量数组
  • FAISS 版:你把向量交给 FAISS 管理(以后检索才会快)
from dataclasses import dataclass, field
from typing import List, Dict, Any, Tuple
import numpy as np
import faiss
import tiktoken
from sentence_transformers import SentenceTransformer


# ---------- 1) Chunking (token-based) ----------
def chunk_by_tokens(text: str, max_tokens: int = 300, overlap: int = 50, model_name: str = "gpt-4o") -> List[str]:
    enc = tiktoken.encoding_for_model(model_name)
    tokens = enc.encode(text)

    chunks = []
    start = 0
    while start < len(tokens):
        end = min(len(tokens), start + max_tokens)
        chunk_tokens = tokens[start:end]
        chunk_text = enc.decode(chunk_tokens).strip()
        if chunk_text:
            chunks.append(chunk_text)

        start = end - overlap
        if start < 0:
            start = 0
        if end == len(tokens):
            break
    return chunks


# ---------- 2) Memory chunk schema ----------
@dataclass
class MemoryChunk:
    text: str
    type: str = "knowledge"
    tags: List[str] = field(default_factory=list)
    meta: Dict[str, Any] = field(default_factory=dict)


# ---------- 3) Embedding model ----------
class Embedder:
    def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
        self.model = SentenceTransformer(model_name)
        self.dim = self.model.get_sentence_embedding_dimension()  # 获取 该模型的输出向量形状,其实就是输出层形状

    def encode(self, texts: List[str]) -> np.ndarray:
        # normalize=True => cosine similarity can be done by inner product
        vecs = self.model.encode(texts, normalize_embeddings=True)
        return np.asarray(vecs, dtype=np.float32)


# ---------- 4) FAISS Vector Store ----------
class FaissVectorStore:
    """
    FAISS 只存向量;元数据我们自己存(chunks 列表按 index 对齐)
    使用 IndexFlatIP(Inner Product),配合 normalize_embeddings=True 等价 cosine
    """
    def __init__(self, dim: int):
        self.index = faiss.IndexFlatIP(dim)  # 规定了每个向量的 形状一样(模型输出层的形状)
        self.chunks: List[MemoryChunk] = []

    def add(self, vectors: np.ndarray, chunks: List[MemoryChunk]):
        assert vectors.ndim == 2 and vectors.shape[0] == len(chunks)
        self.index.add(vectors)    # 是一个 shape: (N, dim),一行对应一个 chunk 的 向量
        self.chunks.extend(chunks) # 是一个 List, 一个元素对应一个chunk 的 MemoryChunk 实例



if __name__ == "__main__":
    # 0) Prepare document
    doc = """
Agent systems combine an LLM with tools and memory to perform tasks.
Long-term memory stores chunks of validated knowledge, rules, and reflections.
Chunking splits long text into retrievable pieces. Embeddings convert text to vectors.
Vector search finds semantically similar chunks via cosine similarity.
Tool calling lets the model request structured function invocations.
Guardrails restrict action space and validate outputs to reduce hallucinations.
    """.strip()

    # 1) Chunking
    chunks_text = chunk_by_tokens(doc, max_tokens=20, overlap=3, model_name="gpt-4o")
    chunks = [MemoryChunk(text=t, type="knowledge", tags=["agent", "memory"]) for t in chunks_text]

    # 2) Embedding
    embedder = Embedder("all-MiniLM-L6-v2")
    vectors = embedder.encode([c.text for c in chunks])  # shape: (N, dim)

    # 3) Build FAISS store and add
    store = FaissVectorStore(dim=embedder.dim)
    store.add(vectors, chunks)


储存逻辑是和之前一样的。



3.2 Milvus

Milvus 是真正的向量数据库(Vector DBMS)。与“FAISS 索引库”相比,它提供:

  1. 持久化存储(磁盘、数据可靠)
  2. 分布式扩展(横向扩容、分片)
  3. 过滤查询(按字段过滤 + 向量检索)
  4. 集合/Schema 管理(collections、fields、indexes)
  5. 生产能力(多租户、监控、备份、权限/认证等)

工程上你可以这样理解:

  • FAISS:你自己维护 “向量索引 + 元数据存储 + 持久化”
  • Milvus:把这些打包成一个可运维的数据库服务

部署方法:在服务器或本机:

docker run -d --name milvus-standalone \
-p 19530:19530 -p 9091:9091 \
milvusdb/milvus:v2.4.0 \
milvus run standalone

19530:gRPC 端口(pymilvus 默认连这个) 9091:HTTP/metrics(视版本而定)

安装客户端

pip install pymilvus

最小可用代码:建表 + 插入 + 检索

下面示例用一个 schema:

  • id:主键
  • text:原文
  • type:字符串字段(用于过滤)
  • embedding:向量字段
from pymilvus import (
    connections, FieldSchema, CollectionSchema, DataType,
    Collection, utility
)

MILVUS_HOST = "localhost"
MILVUS_PORT = "19530"
COLLECTION_NAME = "agent_memory"
DIM = 384  # 你用 all-MiniLM-L6-v2 就是 384

def setup_collection():
    connections.connect(alias="default", host=MILVUS_HOST, port=MILVUS_PORT)

    if utility.has_collection(COLLECTION_NAME):
        return Collection(COLLECTION_NAME)

    fields = [
        FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
        FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=4096),
        FieldSchema(name="type", dtype=DataType.VARCHAR, max_length=64),
        FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=DIM),
    ]
    schema = CollectionSchema(fields, description="Agent long-term memory")

    col = Collection(COLLECTION_NAME, schema)

    # 建索引(HNSW/IVF等;这里给 HNSW 示例)
    index_params = {
        "index_type": "HNSW",
        "metric_type": "IP",  # cosine 用 IP + normalize
        "params": {"M": 16, "efConstruction": 200},
    }
    col.create_index(field_name="embedding", index_params=index_params)
    col.load()
    return col

def insert_chunks(col, texts, types, vectors):
    # vectors: List[List[float]] or np.ndarray -> 转 python list
    entities = [
        texts,   # text
        types,   # type
        vectors  # embedding
    ]
    # 注意:auto_id=True,id 不用传
    col.insert(entities)
    col.flush()

def search(col, query_vec, k=5, type_filter=None):
    search_params = {"metric_type": "IP", "params": {"ef": 64}}
    expr = None
    if type_filter:
        expr = f'type == "{type_filter}"'

    results = col.search(
        data=[query_vec],
        anns_field="embedding",
        param=search_params,
        limit=k,
        expr=expr,
        output_fields=["text", "type"]
    )
    hits = results[0]
    return [(hit.entity.get("type"), hit.entity.get("text"), float(hit.score)) for hit in hits]

if __name__ == "__main__":
    col = setup_collection()
    print("Collection ready.")

    # 这里假设你已经有 embeddings(DIM=384):
    # texts = [...]
    # types = [...]
    # vectors = [...]  # List[List[float]]
    #
    # insert_chunks(col, texts, types, vectors)
    #
    # query_vec = ...
    # print(search(col, query_vec, k=3, type_filter="rule"))

这段 Milvus 代码体现了什么“数据库能力”?

  • CollectionSchema:你在定义一个表结构(字段类型、长度、向量维度)
  • create_index:数据库负责索引构建与维护
  • expr过滤检索(这是 Milvus 相比纯 FAISS 方案的关键优势)
  • output_fields:返回元数据字段,无需你手动维护 chunks[idx] 对齐


4. Top-K 检索 + 过滤

我们之前提到

  1. 用户来一个新问题 → 把问题也变成向量
  2. 用向量相似度找最相关的几个 chunk(Top-K)

那 Top-K 具体是怎么实现的呢?

你有两样东西:

  • 向量库:V=v1,v2,,vNV = {v_1, v_2, \dots, v_N},每个 viRdv_i \in \mathbb{R}^d
  • 查询向量:qRdq \in \mathbb{R}^d

你定义一个相似度函数 s(q,vi)s(q, v_i)(常见三种):

  1. 内积(Inner Product)s=qvs = q^\top v
  2. 余弦相似度(Cosine Similarity)s=qvqvs = \frac{q^\top v}{|q||v|}
  3. 欧氏距离(L2 distance)d=qv2d = |q - v|_2(越小越相似)

Top-K 查询就是求:

  • 相似度最大:取 K 个最大的 s(q,vi)s(q, v_i)
  • 或距离最小:取 K 个最小的 d(q,vi)d(q, v_i)

如果不使用 FAISS,完全可以用 numpy 做同样的事情

假设你有一个矩阵:

  • ERN×dE \in \mathbb{R}^{N \times d}:所有向量按行堆叠
  • qRd×1q \in \mathbb{R}^{d \times 1}:查询向量

我们知道两个单位向量越靠近,乘机越接近 1。

因此如果向量都归一化,那么相似度计算直接把两个矩阵相乘即可:

scores=Eqscoresi=viqscores = E q \quad \Rightarrow \quad scores_i = v_i^\top q

这是一次矩阵乘法,得到一个长度为 N 的分数数组。

scores 里取最大的 K 个索引即可:

  • argpartition:O(N) 近似分割(更快)
  • argsort:O(N log N) 全排序(更慢)

FAISS 的 IndexFlatIP 实际上就是把这个流程做成 C++ 高性能版。


使用 FAISS 的代码实现,其他部分不变

class FaissVectorStore:
    """
    FAISS 只存向量;元数据我们自己存(chunks 列表按 index 对齐)
    使用 IndexFlatIP(Inner Product),配合 normalize_embeddings=True 等价 cosine
    """
    def __init__(self, dim: int):
        self.index = faiss.IndexFlatIP(dim)
        self.chunks: List[MemoryChunk] = []

    def add(self, vectors: np.ndarray, chunks: List[MemoryChunk]):
        assert vectors.ndim == 2 and vectors.shape[0] == len(chunks)
        self.index.add(vectors)
        self.chunks.extend(chunks)

    def search(self, query_vec: np.ndarray, k: int = 5) -> List[Tuple[MemoryChunk, float]]:
        assert query_vec.shape == (1, self.index.d)
        scores, idxs = self.index.search(query_vec, k)
        results = []
        for j, idx in enumerate(idxs[0]):
            if idx == -1:
                continue
            results.append((self.chunks[idx], float(scores[0][j])))
        return results
      
if __name__ == "__main__":
    # 0) Prepare document
    doc = """
Agent systems combine an LLM with tools and memory to perform tasks.
Long-term memory stores chunks of validated knowledge, rules, and reflections.
Chunking splits long text into retrievable pieces. Embeddings convert text to vectors.
Vector search finds semantically similar chunks via cosine similarity.
Tool calling lets the model request structured function invocations.
Guardrails restrict action space and validate outputs to reduce hallucinations.
    """.strip()

    # 1) Chunking
    chunks_text = chunk_by_tokens(doc, max_tokens=20, overlap=3, model_name="gpt-4o")
    chunks = [MemoryChunk(text=t, type="knowledge", tags=["agent", "memory"]) for t in chunks_text]

    # 2) Embedding
    embedder = Embedder("all-MiniLM-L6-v2")
    vectors = embedder.encode([c.text for c in chunks])  # shape: (N, dim)

    # 3) Build FAISS store and add
    store = FaissVectorStore(dim=embedder.dim)
    store.add(vectors, chunks)

    # 4) Query
    query = "How does chunking relate to embeddings and retrieval?"
    qvec = embedder.encode([query])  # shape: (1, dim)

    results = store.search(qvec, k=3)
    for rank, (c, score) in enumerate(results, 1):
        print(f"\n#{rank} score={score:.3f}")
        print(c.text)

其中 FaissVectorStore 类用了:

self.index = faiss.IndexFlatIP(dim)

IP = Inner Product(内积)。

并且 embedding 生成时你做了:

normalize_embeddings=True

这两个组合在一起意味着:

向量都被归一化到单位长度:|q|=|v|=1 所以:qv=cos(q,v)q^\top v = \cos(q, v)

也就是说:用内积做 Top-K = 用余弦相似度做 Top-K(在向量已归一化的前提下)。

然后

scores, idxs = self.index.search(query_vec, k)

返回:

  • scores: shape = (1, k) 第 j 个就是第 j 名的相似度分数(内积/余弦)
  • idxs: shape = (1, k) 第 j 个是向量在 index 里的 ID(按插入顺序的编号)

然后你做:

results.append((self.chunks[idx], float(scores[0][j])))

因为你把 chunks 按插入顺序 extend 进列表,所以:

FAISS 的向量 id = chunks 列表的下标

这就是“向量库只存向量,元数据你自己存,并按顺序对齐”的核心机制。


思考:为什么需要“Top-K”,而不是只取 Top-1?

工程上 Top-K 的意义是:

  1. 鲁棒性:embedding 不稳定/语义有噪声,Top-1 可能偏
  2. 覆盖不同角度:同一问题可能关联多个 chunk(规则 + 背景知识)
  3. RAG 注入:你要把多个候选 chunk 作为检索上下文喂给 LLM

因此常见做法是:

  • k=5~20
  • 再用“二次过滤/重排”(MMR、RRF、cross-encoder rerank)

5. 大样本Top-K 查询:ANN

当 N 小(比如几万、几十万)时,精确检索还能接受。 当 N 很大(百万、千万、亿级)时,精确检索每次都扫全库就太慢了。

这时向量数据库会用 ANN(Approximate Nearest Neighbor,近似最近邻)

  • 目标:牺牲一点点准确率,换取数量级的速度提升
  • 结果:Top-K 变成“高概率正确”的 Top-K,而不是数学严格 Top-K

FAISS/Milvus 的核心价值,就在于 ANN 索引。

你当前代码里是:

self.index = faiss.IndexFlatIP(dim)

IndexFlatIP 是什么?

  • Flat = 扁平索引
  • 意味着:所有向量一个不落,全扫
  • 计算方式: 对每个向量 viv_i,算一次 qviq \cdot v_i

也就是说:你的 .search() 是 100% 精确搜索,不是 ANN


ANN 有几个实现方法

  • 方法一: IVF(倒排文件,Inverted File Index)

    思路:先把向量空间聚类成很多“桶”(centroids)。

    查询时:

    1. 找离 q 最近的几个桶(nprobe 个)
    2. 只在这些桶里的向量做 Top-K

    复杂度从扫全库变成扫“少量桶”。

    关键参数:

    • nlist:桶数量
    • nprobe:查询时探测多少桶(越大越准越慢)
    quantizer = faiss.IndexFlatIP(dim)
    index = faiss.IndexIVFFlat(
        quantizer,
        dim,
        nlist=1024,     # 聚类中心数量
        metric=faiss.METRIC_INNER_PRODUCT
    )
    index.train(vectors)
    index.add(vectors)
    
    index.nprobe = 16  # 查询时探测的桶数量
    

    这时:

    index.search(query_vec, k)

    就是 ANN 搜索了,因为:只在 nprobe 个桶里找,不是全库扫描


  • 方法二:HNSW(图索引,Hierarchical Navigable Small World)

    思路:把向量构成“近邻图”,查询时从图上贪心走,逐步接近最近邻。

    特点:

    • 查询快
    • 构建/插入相对重
    • 在很多场景下效果极好(Milvus 默认常用)

    关键参数:

    • M:每个节点连边数
    • efConstruction:建图精度
    • ef:查询时探索宽度(越大越准越慢)
    index = faiss.IndexHNSWFlat(dim, M=32)
    index.hnsw.efSearch = 64
    index.add(vectors)
    

    这时:

    index.search(query_vec, k)

    也是 ANN 搜索,因为:

    • 在图上做启发式遍历
    • 不保证数学上最优,但概率上很准

  • 方法三:PQ(乘积量化,Product Quantization)

    思路:压缩向量,减少内存与计算量,用近似距离计算。

    特点:

    • 超大规模、内存敏感场景
    • 精度会下降,但可控

所以真正的“是否 ANN”判断标准是:

Index 类型是否 ANN说明
IndexFlatIP❌ 否全量精确搜索
IndexFlatL2❌ 否全量精确搜索
IndexIVFFlat✅ 是聚类 + 局部搜索
IndexIVFPQ✅ 是聚类 + 压缩
IndexHNSWFlat✅ 是图搜索
IndexPQ✅ 是向量压缩

👉 .search() 永远只是“执行搜索” 👉 “近似 or 精确”由 index 决定



几个注意事项:

  1. 不要混用 embedding 模型 否则向量空间不一致,Top-K 结果完全不可信。
  2. metric 要和 normalize 策略匹配
    • cosine:normalize + IP
    • L2:不 normalize,直接 L2
  3. K 不要过大 取太多噪声会变多,注入 prompt 反而降低回答质量。
  4. Top-K 之后通常要过滤/重排 典型:MMR 去冗余、RRF 融合多路检索、cross-encoder rerank 提升精度。


6. 检索增强生成(RAG)

我们上面已经学习了如何构建+检索一个向量知识库,用于在提问时获取到相关的知识。

我们还没有学如何在 检索到相关向量知识后,把 知识塞入到 context prompt 中。加上这一步,就是 完整的 RAG 步骤

RAG = Retrieval-Augmented Generation(检索增强生成)

  • Retrieval:用 embedding + 向量数据库,从外部知识源中找相关信息
  • Augmented:把检索结果作为上下文(context)注入模型
  • Generation:模型基于“检索到的事实”生成答案
用户问题

检索(向量数据库 Top-K)

相关资料(chunks)

LLM(基于资料生成)

回答(更准、更可控)

怎么把检索到的内容插入 prompt 也很简单,我们已经得到了匹配度最高的几个 向量,使用他们对应的 chunk 文本插入到 prompt 中即可。(注意不是往 prompt 中插入 向量)

先把检索结果拼成「Context 块」

def build_context(chunks):
    return "\n\n".join(
        f"[{i+1}] {chunk.text}"
        for i, chunk in enumerate(chunks)
    )

明确告诉模型:只能用这些内容回答

messages = [
    {
        "role": "system",
        "content": (
            "You are a QA assistant.\n"
            "Answer the question ONLY using the provided context.\n"
            "If the answer is not in the context, say 'I don't know'."
        )
    },
    {
        "role": "system",
        "content": f"Context:\n{context_text}"
    },
    {
        "role": "user",
        "content": user_query
    }
]

RAG 中: 👉 向量只用于“检索”, 👉 插入 prompt 的永远是「原始文本 chunk」,而不是向量。



(三) 向量检索结果的优化

我们已经学会了 RAG 的基本流程

- chunk
- embedding
- Top-K
- 拼 context
- 问答

可以进一步优化为

- 多类型 chunk(rule / knowledge)
- 向量数据库
- Top-K + 过滤
- prompt 约束“只基于 context”
- 防幻觉

更多的技巧包括:

- 多路检索(BM25 + vector)
- RRF / MMR
- Cross-encoder rerank
- Query rewrite
- 多 hop RAG
- Agent-driven RAG(planner 决定查什么)

1. 筛选结果去重:MMR

你做 Top-K 向量检索后,常见问题是 Top-K 里很多 chunk 高度相似(同一段话被不同切分、同一主题的重复表述)

MMR 的目标:在“尽量相关(relevant)”的前提下,挑出“尽量不重复(diverse)”的一组 chunk。

一句话:Top-K 负责“找相关”,MMR 负责“去冗余 + 增覆盖”。

MMR 的原理

  • 查询向量 (q)
  • 候选向量集合 C(比如先从向量库取 Top-50)
  • 已选集合 S(初始为空)
  • 相似度函数 sim(常用 cosine / inner product)
  • 超参数 λ[0,1]\lambda \in [0,1]

MMR 每一步选择一个新的候选 dCSd\in C\setminus S,最大化:

MMR(d)=λsim(d,q)(1λ)maxsSsim(d,s)\text{MMR}(d) = \lambda \cdot \text{sim}(d, q) - (1-\lambda)\cdot \max_{s\in S}\text{sim}(d, s)

解释:

  • 第一项:与 query 的相关性(越大越好)
  • 第二项:与已选结果的相似度惩罚(越重复惩罚越大)
  • λ\lambda 越大 → 更偏“相关性”
  • λ\lambda 越小 → 更偏“多样性/去冗余”

工程经验:

  • λ=0.70.9\lambda=0.7\sim0.9:常用,既相关又不太重复
  • 候选池先取 TopN=20~100,再用 MMR 选出 K=5~20

简洁代码示例

  • cand_vecs: shape (N, d),候选 chunk 的向量(已 normalize)
  • qvec: shape (d,)
  • k: 你最终要选的数量
  • lambda_: MMR 权衡参数
import numpy as np

def mmr_select(cand_vecs: np.ndarray, qvec: np.ndarray, k: int = 8, lambda_: float = 0.8):
    """
    cand_vecs: (N, d) 候选向量(建议已 normalize)
    qvec:      (d,)   查询向量(建议已 normalize)
    return:    选中索引列表(长度 <= k)
    """
    N = cand_vecs.shape[0]
    if N == 0:
        return []

    # 相关性:与 query 的相似度
    rel = cand_vecs @ qvec  # (N,)

    selected = []
    selected_mask = np.zeros(N, dtype=bool)

    # 先选最相关的一个
    first = int(np.argmax(rel))
    selected.append(first)
    selected_mask[first] = True

    # 预先算候选之间的相似度矩阵(N 不大时最简单)
    sim_mat = cand_vecs @ cand_vecs.T  # (N, N)

    while len(selected) < min(k, N):
        # 对每个未选候选,计算与已选集合的最大相似度(重复度)
        max_sim_to_selected = np.max(sim_mat[:, selected], axis=1)  # (N,)

        # MMR 分数:相关性 - 重复惩罚
        mmr = lambda_ * rel - (1 - lambda_) * max_sim_to_selected

        # 禁止选已选过的
        mmr[selected_mask] = -np.inf

        nxt = int(np.argmax(mmr))
        selected.append(nxt)
        selected_mask[nxt] = True

    return selected

怎么接到你的 RAG 管线里

  1. 向量库先取一个较大的候选池(比如 Top-50)
  2. 对候选池做 mmr_select(..., k=8)
  3. 用这些索引对应的 chunk 文本注入 prompt


2. 多路检索融合:RRF

你经常会有不止一种检索方式(就是按照不同具体要求检索匹配chunk),例如:

  • 向量检索(semantic / embedding)
  • 关键词检索(BM25 / TF-IDF)
  • 规则库检索(只搜 type=rule)
  • 多查询改写(query rewrite)得到的多组 Top-N
  • 多向量空间(不同 embedding 模型)或不同索引

这些检索器各自会输出一个排序列表(ranked list),但它们:

  • 评分尺度不一致(BM25 分数 vs cosine 分数无法直接加权),而 RRF 完全不看原始分数,只看名次(rank),因此天然可融合异构检索器。
  • 向量检索可能漏掉“关键字很重要但语义不明显”的片段(比如 API 参数名、错误码),BM25 可能漏掉“同义改写”的段落,RRF 能把两者的优势叠加,减少单路检索的“盲区”
  • 某一路检索器给了一个非常高分但其实是噪声的结果,RRF 只看它是否在前几名;如果它只在某一路靠前、其他路靠后,融合后通常不会被顶到最前。

**RRF 的目标:**用一种“对分数尺度不敏感”的方式,把多个排序列表融合成一个更稳健的 Top-K。

RRF 的原理(核心公式)

假设有多路检索器输出列表 L1,L2,,LmL_1, L_2, \dots, L_m。 对任意候选文档 (d),它在第 (i) 个列表中的排名为 ranki(d)rank_i(d)(从 1 开始;不在列表里则跳过)。

RRF 分数定义为:

RRF(d)=i=1m1k+ranki(d)\text{RRF}(d) = \sum_{i=1}^{m} \frac{1}{k + rank_i(d)}
  • k 是一个常数(通常 60 左右),用于平滑: 防止 rank=1 的项过度碾压 rank=10 的项
  • 文档在多个列表都靠前 → 分数累加更高 → 排名更靠前
  • 只在某一路靠前、其他路没出现 → 分数有限

直观理解:

  • 你在“多位评委”的排名里反复被提到,且名次靠前 → 总体更可信。

什么时候用 RRF(典型场景)

  1. Hybrid Retrieval:BM25 + Vector
  2. Multi-query Retrieval:同一问题做 query rewrite,得到多组向量检索结果
  3. Multi-index:规则库 + 知识库(不同索引/不同过滤条件)
  4. Multi-embedding:不同 embedding 模型产生不同候选

简洁代码示例(可直接用)

假设你有两路检索结果(只要排名列表即可):

  • ranked_a: 向量检索 Top-N 的 doc_id 列表(从第 1 名到第 N 名)
  • ranked_b: BM25 检索 Top-N 的 doc_id 列表

RRF 融合后输出一个排序好的 doc_id 列表:

from collections import defaultdict
from typing import List, Hashable

def rrf_fuse(rank_lists: List[List[Hashable]], k: int = 60, top_n: int = 10):
    """
    rank_lists: 多路检索器的排名列表,每个元素是 doc_id 按从高到低排序
    k: RRF 常数(常用 60)
    top_n: 返回前 top_n 个
    """
    scores = defaultdict(float)

    for lst in rank_lists:
        for rank, doc_id in enumerate(lst, start=1):  # rank 从 1 开始
            scores[doc_id] += 1.0 / (k + rank)

    fused = sorted(scores.items(), key=lambda x: x[1], reverse=True)
    return [doc_id for doc_id, _ in fused[:top_n]]

用法示例

vector_top = ["c7", "c2", "c9", "c1", "c5"]
bm25_top   = ["c2", "c3", "c5", "c8", "c1"]

fused = rrf_fuse([vector_top, bm25_top], k=60, top_n=5)
print(fused)

一个非常常见的工程顺序:

多路检索 → RRF 融合 → 取 Top-N → MMR 选 Top-K → 注入 prompt

  • k=60 是经典默认值(广泛使用,稳定)
  • 每路取 TopN=20~100 再融合
  • 融合后取 TopN=20~50 做后续 rerank 或 MMR

(四)Memory 的优化

1. 记忆写入策略

之前我们学了怎创建一个 向量数据库, 以及几种检索的办法。但是放到实际对话流程中,什么样的内容才值得被记入 向量数据库呢?长期记忆一旦写错,会出现两类灾难:

  1. 污染(poisoning) 模型把臆测写进去,后续检索又把它当事实,错误会被放大、固化。
  2. 膨胀(bloat) 什么都写会导致:
    • 向量库越来越大、成本升高
    • 检索质量下降(噪声变多)
    • Top-K 变成“随机抽签”

因此工程上必须明确:

哪些信息可以写?以什么粒度写?写在哪里?何时更新/淘汰?

典型可写入项(强建议):

  • 用户明确陈述的稳定事实/偏好(例如“我只用 pip 不用 conda”)
  • 高价值、可复用的知识片段(来自可信文档/工具 observation)
  • 规则/约束(例如“遇到时间约束 after X 解释为严格大于”)
  • 反思(失败经验):但必须可验证、可泛化

典型不写入项(建议跳过):

  • 一次性闲聊、情绪性内容、低复用内容
  • 模型自行推断、没有来源支持的“事实”
  • 过于细碎的中间过程(日志可以存,但不进长期记忆检索库)

例如:

满足以下至少一项才写:

  1. type == "rule" 且可泛化(包含 if-then 或操作约束)
  2. type == "knowledge" 且来源可信(tool/user)并且可复用
  3. type == "preference" 且用户明确表达
  4. type == "reflection" 且是“可执行的改进规则”(不是情绪)

拒绝写入(硬拒绝)

出现以下任意一条直接拒绝写入:

  • 来源为 model_inference 且被当成事实写入
  • 文本包含“我猜”“可能”“大概”但没有验证来源
  • 无法提供 source_id(至少要能追溯到工具输出/用户发言)

2. 记忆管理策略: 多通道记忆

把记忆分“类型”,不要混在一起。

通道内容
Event发生了什么
Knowledge世界知识
Rule规则
Pattern解题/决策模式
Person用户偏好
Tool工具使用经验

工程实现

memory = {
    "rules": VectorDB(),
    "patterns": VectorDB(),
    "events": VectorDB(),
}

检索时:先 rules,再 patterns,再 knowledge

写入内容必须短且“可检索”。通常:

  • knowledge:200–500 tokens(或更短)
  • rule:一句一条
  • reflection:一次失败一条(必须总结成可执行规则)

条记忆都要记录来源与可信度

  • 来源来自 tool_observation / user_statement / model_inference
  • 默认:只有前两者才允许写入“事实类 knowledge”
  • model_inference 只能写入“待验证的假设”,并设置低置信度、短 TTL

维护策略(Update / Dedup / TTL)

  • 去重:相同内容不要重复写
  • 版本化:同一条规则更新版本,不是新增一堆
  • TTL / 过期:会变化的事实要过期(例如价格、政策、日程)

下面是一份最小但可用的写入策略代码。你可以把它接到你的 Agent loop 里:当一次任务结束后,把候选记忆条目交给 maybe_write_memory()

from dataclasses import dataclass, field
from typing import Dict, Any, List, Optional
import time
import hashlib

# ---------- Memory schema ----------
@dataclass
class MemoryItem:
    text: str
    type: str  # "knowledge" | "rule" | "event" | "preference" | "reflection"
    tags: List[str] = field(default_factory=list)
    meta: Dict[str, Any] = field(default_factory=dict)
    created_at: float = field(default_factory=lambda: time.time())
    expires_at: Optional[float] = None
    confidence: float = 0.8
    content_hash: str = ""

def stable_hash(s: str) -> str:
    return hashlib.sha256(s.strip().encode("utf-8")).hexdigest()

# ---------- In-memory "store" (demo) ----------
class MemoryStoreSimple:
    def __init__(self):
        self.items: List[MemoryItem] = []
        self.hash_index: Dict[str, int] = {}  # hash -> item idx

    def upsert(self, item: MemoryItem):
        # 去重 / 版本化(最简:同 hash 覆盖)
        if item.content_hash in self.hash_index:
            idx = self.hash_index[item.content_hash]
            self.items[idx] = item
        else:
            self.hash_index[item.content_hash] = len(self.items)
            self.items.append(item)

    def purge_expired(self):
        now = time.time()
        kept = []
        new_index = {}
        for it in self.items:
            if it.expires_at is not None and it.expires_at <= now:
                continue
            new_index[it.content_hash] = len(kept)
            kept.append(it)
        self.items = kept
        self.hash_index = new_index


# ---------- Write policy ----------
ALLOWED_TYPES = {"knowledge", "rule", "event", "preference", "reflection"}
TRUSTED_SOURCES = {"tool_observation", "user_statement"}  # 可信来源
DEFAULT_TTL_SECONDS = {
    "knowledge": None,            # 默认不过期(也可以按业务设)
    "rule": None,
    "preference": None,
    "event": 365 * 24 * 3600,     # 事件默认保 1 年(可调)
    "reflection": 180 * 24 * 3600 # 反思保 180 天(可调)
}

def should_write(item: MemoryItem) -> (bool, str):
    # 0) 基本合法性
    if item.type not in ALLOWED_TYPES:
        return False, f"invalid type: {item.type}"

    src = item.meta.get("source_type")
    if not src:
        return False, "missing meta.source_type"

    # 1) 硬拒绝:把推断当事实写
    if item.type in {"knowledge", "rule"} and src == "model_inference":
        return False, "refuse: model_inference cannot write knowledge/rule as fact"

    # 2) knowledge 必须来自可信来源
    if item.type == "knowledge" and src not in TRUSTED_SOURCES:
        return False, "refuse: knowledge must come from tool_observation or user_statement"

    # 3) rule 必须可泛化:简单启发式(工程里可更严格)
    if item.type == "rule":
        t = item.text.lower()
        if ("if" not in t and "when" not in t and "must" not in t and "always" not in t):
            return False, "refuse: rule not generalizable enough"

    # 4) 太短/太噪声也不写(可调)
    if len(item.text.strip()) < 20:
        return False, "refuse: too short"

    return True, "ok"


def maybe_write_memory(store: MemoryStoreSimple, raw_text: str, mem_type: str, tags=None, meta=None, confidence=0.8):
    tags = tags or []
    meta = meta or {}

    item = MemoryItem(
        text=raw_text.strip(),
        type=mem_type,
        tags=tags,
        meta=meta,
        confidence=confidence,
    )
    item.content_hash = stable_hash(f"{item.type}:{item.text}")

    # TTL
    ttl = DEFAULT_TTL_SECONDS.get(item.type)
    if ttl is not None:
        item.expires_at = time.time() + ttl

    ok, reason = should_write(item)
    if not ok:
        return False, reason

    store.upsert(item)
    return True, "written"


if __name__ == "__main__":
    store = MemoryStoreSimple()

    # 例 1:来自工具 observation 的 knowledge(允许写)
    ok, msg = maybe_write_memory(
        store,
        raw_text="CometAPI chat endpoint is POST /v1/chat/completions; base URL can be https://api.cometapi.com/v1.",
        mem_type="knowledge",
        tags=["cometapi", "endpoint"],
        meta={"source_type": "tool_observation", "source_id": "doc:cometapi#chat"}
    )
    print("write1:", ok, msg)

    # 例 2:模型猜的“事实”(拒绝写)
    ok, msg = maybe_write_memory(
        store,
        raw_text="CometAPI is always stable and will never shut down.",
        mem_type="knowledge",
        tags=["cometapi"],
        meta={"source_type": "model_inference"}
    )
    print("write2:", ok, msg)

    # 例 3:可泛化 rule(允许写)
    ok, msg = maybe_write_memory(
        store,
        raw_text="When answering with retrieved context, always cite the chunk ids used; if not found, say I don't know.",
        mem_type="rule",
        tags=["rag", "safety"],
        meta={"source_type": "user_statement", "source_id": "chat:turn123"}
    )
    print("write3:", ok, msg)

    print("\nStored:", len(store.items))
    for it in store.items:
        print("-", it.type, it.text, "| src:", it.meta.get("source_type"))

3. 写入触发时机

  • 每轮对话都写?还是任务结束写?
  • 失败后写 reflection?成功后写 rule?

4. 写入内容总结(Summarizer/Extractor)

把一次长对话压缩成 2–3 条可检索记忆

为什么要?

  • 对话太长
  • 噪声太多
  • Prompt 塞不下

示例

Summarize the following interaction into:
- Learned rules
- Learned patterns
- Things to avoid

然后只写总结进长期记忆。


5. 记忆作用域(scope)

global / project / user / session


6. Memory Decay(遗忘机制)

不遗忘 = 灾难

工程策略

  • 时间衰减
  • 使用频率衰减
  • 置信度衰减
effective_score = similarity * decay_factor

7. Meta Memory(元记忆)

这是高级 Agent 的标志

是什么?

Agent 对“自己”的记忆

例子:

  • “我在时间边界问题上容易误读 after/before”
  • “我使用 search 工具时容易重复 query”

用途

  • Planner 调整策略
  • Reflexion 更快
  • 个性稳定