AI Agent 记忆架构设计:从短期上下文到长期知识管理的工程实践
AI Agent 记忆架构设计:从短期上下文到长期知识管理的工程实践
当 ChatGPT 第一次展现出”记住”对话上下文的能力时,我们惊叹于大语言模型的涌现能力。但随着 AI Agent 从简单的对话工具进化为能够自主执行复杂任务的智能体,记忆不再是一个锦上添花的功能,而是决定 Agent 能否真正”智能”的核心架构组件。
本文将深入探讨 AI Agent 记忆系统的设计原理与工程实践,从认知科学的启发到分布式系统的实现,从向量数据库的选择到记忆压缩算法,力求为构建生产级 Agent 系统提供一套完整的技术参考。
一、为什么 Agent 需要记忆?
1.1 从 Stateless 到 Stateful 的范式转变
早期的 LLM 应用大多是 Stateless(无状态)的:每次请求都是独立的,模型不记得你是谁,不记得你们之前聊过什么。这种模式在简单问答场景下工作良好,但对于需要持续交互的 Agent 来说,存在根本性的局限:
- 上下文断裂:无法维持跨会话的连贯性
- 知识无法累积:每次都要重新交代背景信息
- 个性化缺失:无法根据用户历史行为进行适配
- 任务无法持续:复杂多步任务难以在多次交互中推进
2025 年以来,随着 Claude、OpenAI、DeepSeek 等厂商相继推出具备百万级上下文窗口的模型,以及专门的记忆 API,Stateful Agent 正在成为新的技术范式。
1.2 记忆的三个层级
借鉴认知心理学的研究,我们可以将 Agent 的记忆系统划分为三个层级:
| 记忆类型 | 类比人类认知 | 时间尺度 | 存储介质 | 典型容量 |
|---|---|---|---|---|
| 感觉记忆 (Sensory Memory) | 瞬时感知 | 毫秒-秒 | 模型上下文窗口 | 128K-2M tokens |
| 工作记忆 (Working Memory) | 当前任务上下文 | 分钟-小时 | 上下文窗口 + 缓存 | 可扩展 |
| 长期记忆 (Long-term Memory) | 知识和经验 | 天-永久 | 外部存储 | 理论上无上限 |
这种分层架构的优势在于:不同层级的记忆可以针对不同的访问模式和存储成本进行优化。
二、感觉记忆:上下文窗口的工程艺术
2.1 上下文窗口的物理限制
即使是目前最先进的模型(如 Claude 3.7 Sonnet 的 200K 上下文、Gemini 的 1M+ 上下文),上下文窗口仍然是一种稀缺资源。理解这一点对于设计高效的 Agent 系统至关重要。
# 一个直观的计算示例
# 假设平均每个 token 对应 0.75 个英文单词
# 200K 上下文 ≈ 150K 单词 ≈ 300 页标准书籍
# 但实际可用的上下文往往更少:
# - 系统提示词占用 ~1-5K tokens
# - 工具定义占用 ~5-20K tokens(如果使用大量工具)
# - 输出预留 ~4-8K tokens
# - 实际留给对话历史的:~150K tokens2.2 上下文压缩技术
当历史对话超过上下文限制时,我们需要进行压缩。以下是几种主流技术:
2.2.1 滑动窗口 (Sliding Window)
最简单的策略:只保留最近的 N 轮对话。
class SlidingWindowMemory:
def __init__(self, max_turns: int = 10):
self.max_turns = max_turns
self.history = []
def add(self, message: Message):
self.history.append(message)
if len(self.history) > self.max_turns * 2: # *2 for user+assistant pairs
self.history = self.history[2:] # Remove oldest turn
def get_context(self) -> List[Message]:
return self.history优点:实现简单,保证最新的上下文一定被保留 缺点:可能丢失关键的早期信息
2.2.2 摘要压缩 (Summarization)
使用 LLM 本身来压缩历史对话:
class SummarizationMemory:
def __init__(self, llm_client, threshold_tokens: int = 10000):
self.llm = llm_client
self.threshold = threshold_tokens
self.recent_messages = []
self.summary = ""
async def add(self, message: Message):
self.recent_messages.append(message)
# 检查是否需要压缩
if self._estimate_tokens() > self.threshold:
# 将旧消息压缩成摘要
old_messages = self.recent_messages[:-5] # Keep last 5 turns
self.summary = await self._summarize(old_messages)
self.recent_messages = self.recent_messages[-5:]
async def _summarize(self, messages: List[Message]) -> str:
prompt = f"""Summarize the following conversation, preserving key facts,
user preferences, and decisions made:
{format_messages(messages)}
Summary:"""
return await self.llm.complete(prompt)优点:智能保留关键信息 缺点:需要额外的 LLM 调用,增加成本和延迟
2.2.3 选择性保留 (Selective Retention)
基于重要性评分来决定保留哪些消息:
class SelectiveMemory:
def __init__(self, llm_client, max_tokens: int = 120000):
self.llm = llm_client
self.max_tokens = max_tokens
self.messages = []
self.importance_scores = {}
async def add(self, message: Message):
# 评估消息重要性
importance = await self._score_importance(message)
self.messages.append(message)
self.importance_scores[message.id] = importance
def get_context(self) -> List[Message]:
# 按重要性排序,在 token 限制内选择最重要的消息
sorted_msgs = sorted(
self.messages,
key=lambda m: self.importance_scores.get(m.id, 0),
reverse=True
)
selected = []
total_tokens = 0
for msg in sorted_msgs:
tokens = estimate_tokens(msg)
if total_tokens + tokens <= self.max_tokens:
selected.append(msg)
total_tokens += tokens
# 按时间顺序返回
return sorted(selected, key=lambda m: m.timestamp)2.3 工具调用与上下文的博弈
在 Function Calling 场景下,工具调用的结果会占用大量上下文空间。一个典型的工具调用可能包含:
{
"tool_call_id": "call_abc123",
"role": "assistant",
"content": null,
"tool_calls": [{
"id": "call_abc123",
"type": "function",
"function": {
"name": "search_database",
"arguments": "{\"query\": \"user preferences\", \"limit\": 10}"
}
}]
}如果工具返回大量数据(如搜索结果、代码文件、数据库查询),上下文会迅速被填满。工程实践中,我们通常会对工具返回进行截断或摘要。
三、工作记忆:Agent 的”便签纸”
3.1 工作记忆的作用
工作记忆是 Agent 在执行任务时的”临时工作台”。它存储:
- 当前任务目标:Agent 正在尝试完成什么
- 中间结果:多步骤任务中已完成的子任务结果
- 待办事项:还需要执行的步骤
- 关键参数:如用户 ID、会话 ID、任务 ID 等
3.2 实现模式:Scratchpad
一种常见的实现是 Scratchpad(草稿本) 模式:
class ScratchpadMemory:
"""Agent 的工作记忆实现"""
def __init__(self):
self.scratchpad = []
self.max_entries = 50
def write(self, entry: str, category: str = "general"):
"""向草稿本添加条目"""
self.scratchpad.append({
"content": entry,
"category": category,
"timestamp": datetime.now().isoformat(),
"id": generate_id()
})
# 保持草稿本大小可控
if len(self.scratchpad) > self.max_entries:
self.scratchpad = self.scratchpad[-self.max_entries:]
def read(self, category: Optional[str] = None) -> List[Dict]:
"""读取草稿本内容"""
if category:
return [e for e in self.scratchpad if e["category"] == category]
return self.scratchpad
def clear(self, category: Optional[str] = None):
"""清空草稿本"""
if category:
self.scratchpad = [e for e in self.scratchpad if e["category"] != category]
else:
self.scratchpad = []
def format_for_llm(self) -> str:
"""格式化为 LLM 可读的字符串"""
if not self.scratchpad:
return "Scratchpad is empty."
lines = ["=== Agent Scratchpad ==="]
for entry in self.scratchpad:
lines.append(f"[{entry['category']}] {entry['content']}")
return "\n".join(lines)3.3 与 LLM 的集成
在实际的 Agent 系统中,工作记忆通常作为系统提示词的一部分:
SYSTEM_PROMPT_TEMPLATE = """You are an AI assistant with access to tools.
## Current Task
{task_description}
## Working Memory (Scratchpad)
{scratchpad_content}
## Instructions
1. Use the scratchpad to track your progress
2. Write important intermediate results to the scratchpad
3. Check the scratchpad before calling tools to avoid redundant operations
4. When you complete a task, summarize the results
Available tools: {tool_descriptions}
"""四、长期记忆:Agent 的”知识库”
4.1 长期记忆的分类
长期记忆可以进一步细分为:
| 类型 | 描述 | 示例 |
|---|---|---|
| 事实记忆 (Facts) | 关于世界和用户的客观信息 | 用户的职业、偏好设置 |
| 情节记忆 (Episodic) | 具体的事件和经历 | 上次对话的内容、完成的任务 |
| 程序记忆 (Procedural) | “如何做”的知识 | 用户的常用工作流程 |
| 语义记忆 (Semantic) | 概念和关系 | 领域知识、术语定义 |
4.2 向量数据库:长期记忆的基石
现代 Agent 系统的长期记忆几乎总是基于向量检索(RAG)实现。以下是主流向量数据库的对比:
| 数据库 | 特点 | 适用场景 |
|---|---|---|
| Pinecone | 全托管、自动扩缩容 | 快速启动、不想运维 |
| Weaviate | 开源、GraphQL 接口 | 需要复杂查询能力 |
| Chroma | 轻量、易嵌入 | 本地开发、小型项目 |
| Qdrant | Rust 编写、高性能 | 需要低延迟检索 |
| Milvus | 云原生、分布式 | 大规模生产环境 |
4.3 记忆存储的完整流程
class LongTermMemory:
"""长期记忆管理器"""
def __init__(self, vector_store, embedding_model):
self.vector_store = vector_store
self.embedder = embedding_model
async def store(self, content: str, metadata: Dict, memory_type: str = "episodic"):
"""存储记忆"""
# 1. 生成向量嵌入
embedding = await self.embedder.embed(content)
# 2. 构建记忆文档
memory_doc = {
"id": generate_id(),
"content": content,
"embedding": embedding,
"metadata": {
**metadata,
"type": memory_type,
"timestamp": datetime.now().isoformat(),
"importance": await self._calculate_importance(content)
}
}
# 3. 存入向量数据库
await self.vector_store.upsert([memory_doc])
return memory_doc["id"]
async def retrieve(self, query: str, filters: Optional[Dict] = None, top_k: int = 5) -> List[Dict]:
"""检索记忆"""
# 1. 向量化查询
query_embedding = await self.embedder.embed(query)
# 2. 向量相似度搜索
results = await self.vector_store.search(
query_embedding=query_embedding,
filters=filters,
top_k=top_k
)
# 3. 重排序(可选)
results = await self._rerank(results, query)
return results
async def _calculate_importance(self, content: str) -> float:
"""计算记忆重要性分数"""
# 可以基于关键词、情感分析、用户反馈等
# 简单实现:基于内容长度和关键词
score = 0.5
# 包含关键决策词加分
decision_keywords = ["decided", "chose", "agreed", "confirmed", "important"]
for keyword in decision_keywords:
if keyword in content.lower():
score += 0.1
return min(score, 1.0)4.4 记忆的遗忘机制
人类会遗忘,Agent 也应该有遗忘机制。这不仅是出于存储成本的考虑,更是为了保持记忆的相关性和准确性。
class MemoryDecay:
"""记忆衰减管理"""
def __init__(self, half_life_days: float = 30.0):
self.half_life = half_life_days
def calculate_relevance(self, memory: Dict, current_time: datetime) -> float:
"""计算记忆的相关性分数(0-1)"""
# 时间衰减
memory_time = datetime.fromisoformat(memory["metadata"]["timestamp"])
days_passed = (current_time - memory_time).days
# 指数衰减公式
time_decay = 0.5 ** (days_passed / self.half_life)
# 重要性权重
importance = memory["metadata"].get("importance", 0.5)
# 访问频率加成
access_count = memory["metadata"].get("access_count", 0)
access_boost = min(access_count * 0.05, 0.3)
return (time_decay * 0.6 + importance * 0.3 + access_boost * 0.1)
async def cleanup(self, threshold: float = 0.2):
"""清理低相关性记忆"""
current_time = datetime.now()
all_memories = await self.vector_store.get_all()
to_delete = []
for memory in all_memories:
relevance = self.calculate_relevance(memory, current_time)
if relevance < threshold:
to_delete.append(memory["id"])
if to_delete:
await self.vector_store.delete(to_delete)
print(f"Cleaned up {len(to_delete)} outdated memories")五、多 Agent 场景下的记忆共享
5.1 为什么需要记忆共享?
在多 Agent 系统中,Agent 之间需要协作,这就需要:
- 共享上下文:Agent A 需要知道 Agent B 已经完成了什么
- 避免重复工作:避免多个 Agent 重复执行相同任务
- 知识复用:一个 Agent 学到的知识可以被其他 Agent 利用
- 一致性保证:用户对系统的认知保持一致
5.2 共享记忆架构
class SharedMemoryLayer:
"""多 Agent 共享记忆层"""
def __init__(self, redis_client, vector_store):
self.redis = redis_client # 用于实时同步
self.vector_store = vector_store # 用于长期存储
async def publish_event(self, agent_id: str, event_type: str, data: Dict):
"""发布事件到共享内存"""
event = {
"agent_id": agent_id,
"type": event_type,
"data": data,
"timestamp": datetime.now().isoformat()
}
# 发布到 Redis Pub/Sub
await self.redis.publish("agent_events", json.dumps(event))
# 持久化到向量数据库
if event_type in ["task_completed", "knowledge_learned"]:
await self._persist_event(event)
async def subscribe_events(self, agent_id: str, event_types: List[str]):
"""订阅共享事件"""
pubsub = self.redis.pubsub()
await pubsub.subscribe("agent_events")
async for message in pubsub.listen():
if message["type"] == "message":
event = json.loads(message["data"])
# 过滤自己发出的事件
if event["agent_id"] != agent_id:
if event["type"] in event_types:
yield event
async def _persist_event(self, event: Dict):
"""将事件持久化为可检索的记忆"""
content = f"{event['type']}: {json.dumps(event['data'])}"
await self.vector_store.store(
content=content,
metadata={
"agent_id": event["agent_id"],
"event_type": event["type"],
"timestamp": event["timestamp"]
}
)5.3 记忆冲突解决
当多个 Agent 同时更新共享记忆时,可能出现冲突:
class MemoryConflictResolver:
"""记忆冲突解决器"""
async def resolve(self, existing: Dict, incoming: Dict) -> Dict:
"""解决记忆冲突"""
# 策略 1:时间戳优先(Last-Write-Wins)
existing_time = datetime.fromisoformat(existing["metadata"]["timestamp"])
incoming_time = datetime.fromisoformat(incoming["metadata"]["timestamp"])
if incoming_time > existing_time:
return incoming
# 策略 2:合并(对于列表类型的记忆)
if self._is_mergeable(existing, incoming):
return self._merge_memories(existing, incoming)
# 策略 3:保留两者,标记冲突
return self._mark_conflict(existing, incoming)
def _is_mergeable(self, existing: Dict, incoming: Dict) -> bool:
"""判断是否可以合并"""
# 例如:任务列表可以合并
return existing.get("memory_type") == "task_list"
def _merge_memories(self, existing: Dict, incoming: Dict) -> Dict:
"""合并记忆"""
merged = existing.copy()
# 合并任务列表
if existing.get("memory_type") == "task_list":
existing_tasks = set(existing["content"].split(","))
incoming_tasks = set(incoming["content"].split(","))
merged["content"] = ",".join(existing_tasks | incoming_tasks)
merged["metadata"]["merged_from"] = [existing["id"], incoming["id"]]
merged["metadata"]["timestamp"] = datetime.now().isoformat()
return merged六、生产环境的最佳实践
6.1 性能优化
class MemoryCache:
"""记忆缓存层"""
def __init__(self, redis_client, ttl: int = 300):
self.redis = redis_client
self.ttl = ttl
self.local_cache = {} # L1 缓存
async def get(self, key: str) -> Optional[Dict]:
"""多级缓存读取"""
# L1: 本地内存
if key in self.local_cache:
return self.local_cache[key]
# L2: Redis
data = await self.redis.get(f"memory:{key}")
if data:
memory = json.loads(data)
self.local_cache[key] = memory # 回填 L1
return memory
return None
async def set(self, key: str, memory: Dict):
"""多级缓存写入"""
# 写入 L1
self.local_cache[key] = memory
# 写入 L2
await self.redis.setex(
f"memory:{key}",
self.ttl,
json.dumps(memory)
)6.2 隐私与安全
记忆系统存储了大量用户数据,隐私保护至关重要:
class PrivacyFilter:
"""隐私过滤器"""
SENSITIVE_PATTERNS = [
r"\b\d{16}\b", # 信用卡号
r"\b\d{3}-\d{2}-\d{4}\b", # SSN
r"password[:\s]+\S+", # 密码
r"api[_-]?key[:\s]+\S+", # API Key
]
def sanitize(self, content: str) -> str:
"""过滤敏感信息"""
sanitized = content
for pattern in self.SENSITIVE_PATTERNS:
sanitized = re.sub(pattern, "[REDACTED]", sanitized, flags=re.IGNORECASE)
return sanitized
def should_store(self, content: str, user_consent: bool) -> bool:
"""判断是否应该存储"""
if not user_consent:
return False
# 检查是否包含高度敏感信息
if self._contains_high_risk_data(content):
return False
return True
def _contains_high_risk_data(self, content: str) -> bool:
"""检测高风险数据"""
high_risk_patterns = [
r"\b\d{16}\b", # 信用卡号
]
for pattern in high_risk_patterns:
if re.search(pattern, content):
return True
return False6.3 监控与可观测性
class MemoryMetrics:
"""记忆系统监控"""
def __init__(self):
self.storage_size = Gauge("memory_storage_size_bytes", "Total memory storage size")
self.query_latency = Histogram("memory_query_duration_seconds", "Memory query latency")
self.cache_hit_rate = Gauge("memory_cache_hit_rate", "Cache hit rate")
self.memory_count = Gauge("memory_total_count", "Total number of memories")
async def record_query(self, duration: float, cache_hit: bool):
"""记录查询指标"""
self.query_latency.observe(duration)
if cache_hit:
self.cache_hits += 1
else:
self.cache_misses += 1
def get_stats(self) -> Dict:
"""获取统计信息"""
return {
"total_memories": self.memory_count._value.get(),
"storage_size_mb": self.storage_size._value.get() / 1024 / 1024,
"cache_hit_rate": self.cache_hit_rate._value.get(),
"avg_query_latency_ms": self.query_latency._sum.get() / self.query_latency._count.get() * 1000
}七、前沿探索:记忆的未来
7.1 神经符号记忆
将神经网络与符号推理结合,实现:
- 结构化知识:将非结构化文本转化为知识图谱
- 逻辑推理:基于记忆进行因果推断
- 可解释性:知道 Agent 为什么做出某个决策
7.2 记忆增强生成 (Memory-Augmented Generation)
类似人类的发散思维,Agent 可以:
- 联想检索:不仅检索直接相关记忆,还检索关联记忆
- 创造性组合:将不同领域的记忆组合产生新想法
- 反事实推理:“如果当时做了不同的选择…“
7.3 终身学习 (Lifelong Learning)
Agent 能够:
- 持续学习:从新交互中不断更新知识
- 知识整合:将新知识融入现有知识框架
- 防止遗忘:在学习新知识时保留旧知识(解决 catastrophic forgetting)
结语
AI Agent 的记忆架构设计是一个涉及认知科学、分布式系统和机器学习的交叉领域。从简单的上下文窗口到复杂的分层记忆系统,从单机缓存到分布式共享,记忆系统的演进反映了 AI Agent 从”工具”向”智能体”的进化。
设计一个好的记忆系统,需要权衡:
- 存储成本 vs 检索质量
- 实时性 vs 一致性
- 个性化 vs 隐私保护
- 简单性 vs 功能性
没有银弹,只有根据具体场景不断迭代优化。希望本文能为你的 Agent 记忆系统设计提供一些思路和参考。
参考资源
- LangChain Memory Documentation
- MemGPT: Towards LLMs as Operating Systems
- Voyager: An Open-Ended Embodied Agent with Large Language Models
- Generative Agents: Interactive Simulacra of Human Behavior
- Claude Memory API Documentation
本文首发于 illli.dev,转载请注明出处。