924 字
3 分钟
Agent 成本优化:Token 节省与缓存策略
前言
Agent 的成本主要来自 Token 消耗。每次工具调用都在消耗 Token。本章讲解如何有效控制成本,从 Token 优化、模型路由、语义缓存到批量处理,全方位降低 Agent 运行费用。
一、成本结构分析
1.1 Token 成本构成
@dataclassclass CostBreakdown: system_prompt: int # 系统提示词(只算一次) conversation_history: int # 对话历史 tool_result: int # 工具返回结果 final_response: int # 最终响应
# 估算成本def estimate_cost(breakdown: CostBreakdown) -> float: return ( breakdown.system_prompt * 3.0 + # $3/1M tokens breakdown.conversation_history * 3.0 + breakdown.tool_result * 0.6 + # $0.6/1M breakdown.final_response * 3.0 ) / 1_000_0001.2 成本热点
| 热点 | 占比 | 优化空间 |
|---|---|---|
| 系统提示词 | ~30% | Prompt 压缩 |
| 工具返回 | ~40% | 结果缓存 |
| 对话历史 | ~20% | 摘要压缩 |
| 最终响应 | ~10% | 限制长度 |
1.3 Agent 成本的特殊性
与传统 API 调用不同,Agent 的单次请求成本差异巨大:
简单问答: 1 次 LLM 调用 → 约 500 tokens → $0.001工具调用: 3 次 LLM 调用 → 约 3000 tokens → $0.006深度研究: 10 次 LLM 调用 → 约 15000 tokens → $0.03多 Agent 协作: 20 次 LLM 调用 → 约 50000 tokens → $0.10成本随任务复杂度非线性增长。一次深度研究任务的成本可能是简单问答的 100 倍。
1.4 各模型价格对比
| 模型 | 输入价格 ($/1M tokens) | 输出价格 ($/1M tokens) | 性能等级 |
|---|---|---|---|
| GPT-4o | 2.50 | 10.00 | 高 |
| GPT-4o-mini | 0.15 | 0.60 | 中 |
| Claude Sonnet 4 | 3.00 | 15.00 | 高 |
| Claude Haiku 3.5 | 0.80 | 4.00 | 中 |
| DeepSeek Chat | 0.14 | 0.28 | 中 |
| DeepSeek Reasoner | 0.55 | 2.19 | 高 |
| Gemini 2.0 Flash | 0.10 | 0.40 | 中 |
| Gemini 2.5 Pro | 1.25 | 10.00 | 高 |
| 本地 Llama 3.3 70B | 0 (电费) | 0 (电费) | 中 |
二、Prompt 压缩
2.1 系统提示词精简
# 精简前system_prompt_old = """你是一个专业的研究助手。你擅长信息检索、总结和分析。你有以下工具可以使用:1. search - 搜索网络2. scrape - 抓取网页3. calculator - 计算请基于事实回答,不要编造信息。"""
# 精简后system_prompt_new = """你是一个研究助手。使用 search/scrape/calculator。基于事实回答。"""2.2 对话历史压缩
def compress_history(messages: list, max_tokens: int = 2000) -> list: """对话历史压缩""" if count_tokens(messages) <= max_tokens: return messages
# 保留系统 + 最近 N 轮 system = messages[0] # 系统消息 recent = messages[-5:] # 最近5轮
# 对旧消息摘要 old_messages = messages[1:-5] summary = summarize_conversation(old_messages)
return [system, summary] + recent2.3 自动化 Prompt 压缩
使用 DSPy 或类似框架自动压缩 Prompt,在保持效果的同时减少 Token:
class PromptCompressor: """Prompt 自动压缩器"""
async def compress(self, prompt: str, compression_ratio: float = 0.5) -> str: """将 Prompt 压缩到指定比例""" target_tokens = int(count_tokens(prompt) * compression_ratio)
compressed = await llm.complete(f"""请将以下 Prompt 压缩到约 {target_tokens} tokens,保持核心指令不变,去除冗余描述。
原始 Prompt:{prompt}
压缩后 Prompt:""")
return compressed.strip()
async def compress_tool_descriptions(self, tools: list[dict]) -> list[dict]: """压缩工具描述""" compressed = [] for tool in tools: short_desc = await self.compress(tool["description"], compression_ratio=0.4) compressed.append({ "name": tool["name"], "description": short_desc, "parameters": self._shorten_params(tool["parameters"]), }) return compressed
def _shorten_params(self, params: dict) -> dict: """精简参数定义""" required = params.get("required", []) properties = {} for name, schema in params.get("properties", {}).items(): if name in required: # 必填参数保留类型和简短描述 properties[name] = {"type": schema.get("type", "string")} # 可选参数:根据需要保留或移除 return {"type": "object", "required": required, "properties": properties}2.4 动态 Prompt 选择
根据查询复杂度动态选择详细的还是精简的 Prompt:
class DynamicPromptSelector: """动态 Prompt 选择器"""
def __init__(self): self.prompts = { "full": FULL_SYSTEM_PROMPT, # 约 800 tokens "compact": COMPACT_SYSTEM_PROMPT, # 约 200 tokens "minimal": MINIMAL_SYSTEM_PROMPT, # 约 50 tokens }
def select(self, query: str, context: dict) -> str: complexity = self._estimate_complexity(query, context)
if complexity < 0.3: return self.prompts["minimal"] elif complexity < 0.7: return self.prompts["compact"] else: return self.prompts["full"]
def _estimate_complexity(self, query: str, context: dict) -> float: """评估查询复杂度 (0-1)""" score = 0.0
# 长查询更复杂 if len(query) > 200: score += 0.2
# 需要多步骤的任务更复杂 if any(kw in query for kw in ["分析", "比较", "总结", "研究"]): score += 0.3
# 需要工具调用的任务更复杂 if context.get("tools_available"): score += 0.2
# 历史对话长更复杂 if context.get("history_length", 0) > 10: score += 0.2
return min(score, 1.0)三、结果缓存
3.1 工具结果缓存
from cachetools import cachedimport hashlib
@cached(cache={}, key=lambda query: hashlib.md5(query.encode()).hexdigest())async def cached_search(query: str) -> str: """搜索结果缓存""" return await search(query)3.2 Embedding 缓存
from redis import Redis
redis_client = Redis()
def cache_embedding(text: str, embedding: list[float]): key = f"embedding:{hash(text)}" redis_client.setex(key, ttl=86400, value=json.dumps(embedding))
def get_cached_embedding(text: str) -> Optional[list[float]]: key = f"embedding:{hash(text)}" return redis_client.get(key)3.3 语义缓存
语义缓存不是精确匹配,而是根据语义相似度返回缓存结果。这是 Agent 场景下最有效的缓存策略:
flowchart TD
A["用户查询"] --> B{"语义缓存命中?<br/>相似度 > 阈值"}
B -->|"命中"| C["返回缓存结果<br/>成本: $0"]
B -->|"未命中"| D["调用 LLM<br/>成本: $0.01"]
D --> E["存入缓存"]
E --> F["返回结果"]
import numpy as np
class SemanticCache: """语义缓存:基于向量相似度的缓存"""
def __init__( self, embedding_model, similarity_threshold: float = 0.92, ttl_seconds: int = 3600, ): self.embedding_model = embedding_model self.similarity_threshold = similarity_threshold self.ttl = ttl_seconds self.cache_store = {} # 生产环境用 Redis + 向量数据库
async def get(self, query: str) -> tuple[str | None, float]: """查询缓存,返回 (缓存结果, 相似度)""" query_embedding = await self.embedding_model.embed(query)
best_match = None best_similarity = 0.0
for key, entry in self.cache_store.items(): # 检查 TTL if self._is_expired(entry): continue
similarity = cosine_similarity(query_embedding, entry["embedding"]) if similarity > best_similarity: best_similarity = similarity best_match = entry["result"]
if best_similarity >= self.similarity_threshold: return best_match, best_similarity return None, best_similarity
async def set(self, query: str, result: str): """存入缓存""" embedding = await self.embedding_model.embed(query) self.cache_store[hashlib.md5(query.encode()).hexdigest()] = { "query": query, "result": result, "embedding": embedding, "timestamp": time.time(), }
def _is_expired(self, entry: dict) -> bool: return time.time() - entry["timestamp"] > self.ttl
def cosine_similarity(a: list[float], b: list[float]) -> float: a_np = np.array(a) b_np = np.array(b) return float(np.dot(a_np, b_np) / (np.linalg.norm(a_np) * np.linalg.norm(b_np)))3.4 多级缓存架构
生产环境需要多级缓存,从内存到 Redis 再到向量数据库:
class MultiLevelCache: """多级缓存"""
def __init__(self): self.l1 = LRUCache(maxsize=100) # 内存缓存:最快,容量小 self.l2 = RedisCache(ttl=3600) # Redis:精确匹配 self.l3 = VectorDBCache(threshold=0.9) # 向量库:语义匹配
async def get(self, query: str) -> str | None: # L1: 内存精确匹配 result = self.l1.get(query) if result: return result
# L2: Redis 精确匹配 result = await self.l2.get(query) if result: self.l1.set(query, result) # 回填 L1 return result
# L3: 向量库语义匹配 result, similarity = await self.l3.search(query) if result and similarity > self.l3.threshold: self.l1.set(query, result) # 回填 await self.l2.set(query, result) return result
return None
async def set(self, query: str, result: str): """写入所有缓存层""" self.l1.set(query, result) await self.l2.set(query, result) await self.l3.add(query, result)3.5 缓存效果预估
| 缓存类型 | 命中率 | 节省成本 | 适用场景 |
|---|---|---|---|
| 精确匹配 | 10-20% | 10-20% | FAQ、重复查询 |
| 语义缓存 | 30-50% | 30-50% | 客服、知识问答 |
| 工具结果缓存 | 40-60% | 20-40% | 搜索、数据查询 |
| 多级组合 | 50-70% | 50-70% | 高流量生产环境 |
四、智能路由
4.1 简单 vs 复杂路由
async def route(query: str) -> str: """根据查询复杂度选择处理路径""" complexity = estimate_complexity(query)
if complexity == "simple": return await simple_rag(query) # 便宜快速 else: return await deep_research(query) # 昂贵但深入4.2 小模型路由
async def cheap_first(query: str) -> str: """先用小模型尝试,失败再调用大模型""" try: # 先用便宜模型 result = await llm.mini.complete(query) if is_satisfactory(result): return result except QualityCheckFailed: pass
# 回退到大模型 return await llm.pro.complete(query)4.3 完整的智能路由系统
智能路由是成本优化最有效的手段之一。核心思想:简单任务用便宜模型,复杂任务用贵模型。
from enum import Enum
class ModelTier(Enum): CHEAP = "cheap" # GPT-4o-mini, Haiku, Flash STANDARD = "standard" # GPT-4o, Sonnet PREMIUM = "premium" # Claude Opus, o1
class ModelRouter: """智能模型路由器"""
TIER_MODELS = { ModelTier.CHEAP: ["gpt-4o-mini", "claude-haiku-3.5", "gemini-2.0-flash"], ModelTier.STANDARD: ["gpt-4o", "claude-sonnet-4", "deepseek-chat"], ModelTier.PREMIUM: ["claude-opus-4", "o1", "deepseek-reasoner"], }
def __init__(self): self.routing_rules = self._load_rules() self.fallback_enabled = True
async def route(self, query: str, context: dict | None = None) -> tuple[str, ModelTier]: """为查询选择最合适的模型""" complexity = await self._classify_complexity(query, context)
if complexity.score < 0.3: tier = ModelTier.CHEAP elif complexity.score < 0.7: tier = ModelTier.STANDARD else: tier = ModelTier.PREMIUM
# 从对应层级中选择模型 model = self._select_from_tier(tier, complexity) return model, tier
async def _classify_complexity(self, query: str, context: dict | None) -> dict: """分类查询复杂度""" score = 0.0 signals = []
# 信号 1: 查询长度 if len(query) > 500: score += 0.2 signals.append("长查询") elif len(query) < 50: score -= 0.1 signals.append("短查询")
# 信号 2: 是否需要推理 reasoning_keywords = ["分析", "比较", "为什么", "如何", "evaluate", "compare", "why"] if any(kw in query.lower() for kw in reasoning_keywords): score += 0.25 signals.append("需要推理")
# 信号 3: 是否需要工具调用 tool_keywords = ["搜索", "查询", "计算", "查找", "search", "calculate"] if any(kw in query.lower() for kw in tool_keywords): score += 0.15 signals.append("需要工具")
# 信号 4: 是否涉及代码 code_indicators = ["代码", "函数", "bug", "code", "function", "debug"] if any(kw in query.lower() for kw in code_indicators): score += 0.2 signals.append("涉及代码")
# 信号 5: 历史对话轮次 if context and context.get("turn_count", 0) > 5: score += 0.1 signals.append("多轮对话")
return { "score": max(0, min(1, score)), "signals": signals, }
def _select_from_tier(self, tier: ModelTier, complexity: dict) -> str: """从指定层级选择具体模型""" models = self.TIER_MODELS[tier] # 简单的轮询策略,生产环境可加入更多考量 return models[0]
class CascadingRouter: """级联回退路由器:先用便宜模型,不行再升级"""
def __init__(self, router: ModelRouter): self.router = router self.quality_threshold = 0.7
async def route_with_fallback(self, query: str) -> str: """带回退的路由""" # 先尝试便宜模型 model, tier = self.router.route(query, context=None)
if tier == ModelTier.CHEAP: result = await self._call_model(model, query) quality = await self._assess_quality(query, result)
if quality >= self.quality_threshold: return result
# 便宜模型不行,升级到标准模型 model = self.router.TIER_MODELS[ModelTier.STANDARD][0]
result = await self._call_model(model, query)
# 如果标准模型也不行(不太常见),再升级 quality = await self._assess_quality(query, result) if quality < self.quality_threshold and model not in self.router.TIER_MODELS[ModelTier.PREMIUM]: premium_model = self.router.TIER_MODELS[ModelTier.PREMIUM][0] result = await self._call_model(premium_model, query)
return result
async def _call_model(self, model: str, query: str) -> str: return await llm.complete(query, model=model)
async def _assess_quality(self, query: str, response: str) -> float: """快速质量评估(用便宜模型)""" assessment = await llm.complete( f"评估以下回答是否充分回答了问题。只输出 0-1 的分数。\n\n" f"问题: {query}\n回答: {response[:500]}\n\n分数:", model="gpt-4o-mini", # 用便宜模型做质量检查 ) try: return float(assessment.strip()) except ValueError: return 0.5 # 解析失败,默认中等质量4.4 路由效果预估
| 策略 | 成本节省 | 质量影响 | 实现复杂度 |
|---|---|---|---|
| 固定便宜模型 | 70-80% | 质量下降 | 低 |
| 简单/复杂路由 | 40-60% | 几乎无 | 低 |
| 级联回退 | 30-50% | 无 | 中 |
| LLM 路由(小模型分类) | 35-55% | 无 | 中 |
五、批量处理与并行化
5.1 批量请求合并
将多个独立的 LLM 调用合并为一个批量请求,减少 API 调用开销:
import asynciofrom collections import defaultdict
class BatchProcessor: """LLM 批量请求处理器"""
def __init__(self, max_batch_size: int = 10, max_wait_seconds: float = 0.5): self.max_batch_size = max_batch_size self.max_wait = max_wait_seconds self.queue: list[tuple[str, asyncio.Future]] = [] self.lock = asyncio.Lock()
async def submit(self, prompt: str) -> str: """提交单个请求,等待批量处理""" loop = asyncio.get_event_loop() future = loop.create_future()
async with self.lock: self.queue.append((prompt, future))
# 队列满了就立即处理 if len(self.queue) >= self.max_batch_size: await self._flush()
return await future
async def _flush(self): """批量发送""" batch = self.queue[:self.max_batch_size] self.queue = self.queue[self.max_batch_size:]
prompts = [p for p, _ in batch] futures = [f for _, f in batch]
try: # 使用批量 API(如 OpenAI Batch API) results = await batch_llm_call(prompts)
for future, result in zip(futures, results): if not future.done(): future.set_result(result) except Exception as e: for future in futures: if not future.done(): future.set_exception(e)
async def start(self): """启动定时刷新""" while True: await asyncio.sleep(self.max_wait) async with self.lock: if self.queue: await self._flush()5.2 OpenAI Batch API 使用
OpenAI 提供了专门的 Batch API,比实时调用便宜 50%:
import jsonimport time
class OpenAIBatchAPI: """OpenAI Batch API 封装"""
def __init__(self, client): self.client = client
async def submit_batch(self, tasks: list[dict]) -> str: """提交批量任务""" # 创建批量输入文件 batch_lines = [] for i, task in enumerate(tasks): batch_lines.append(json.dumps({ "custom_id": f"task-{i}", "method": "POST", "url": "/v1/chat/completions", "body": { "model": task.get("model", "gpt-4o-mini"), "messages": task["messages"], "max_tokens": task.get("max_tokens", 1000), } }))
# 上传文件 batch_file = self.client.files.create( file="\n".join(batch_lines), purpose="batch", )
# 创建批量任务 batch = self.client.batches.create( input_file_id=batch_file.id, endpoint="/v1/chat/completions", completion_window="24h", )
return batch.id
async def wait_and_get_results(self, batch_id: str) -> list[dict]: """等待批量任务完成并获取结果""" while True: batch = self.client.batches.retrieve(batch_id)
if batch.status == "completed": # 下载结果文件 results = self.client.files.content(batch.output_file_id) return [json.loads(line) for line in results.text.strip().split("\n")] elif batch.status == "failed": raise Exception(f"Batch failed: {batch.errors}") elif batch.status in ("expired", "cancelled"): raise Exception(f"Batch {batch.status}")
time.sleep(30) # 每 30 秒检查一次5.3 并行工具调用
Agent 在需要调用多个独立工具时,应该并行执行而非串行:
class ParallelToolExecutor: """并行工具执行器"""
async def execute_tools(self, tool_calls: list[dict]) -> list[dict]: """并行执行独立的工具调用""" # 分析依赖关系 independent, dependent = self._analyze_dependencies(tool_calls)
# 并行执行独立调用 results = {} if independent: tasks = [ self._execute_single(tc) for tc in independent ] parallel_results = await asyncio.gather(*tasks, return_exceptions=True) for tc, result in zip(independent, parallel_results): results[tc["id"]] = result
# 串行执行有依赖的调用 for tc in dependent: # 注入之前的结果作为上下文 tc_with_context = self._inject_context(tc, results) result = await self._execute_single(tc_with_context) results[tc["id"]] = result
return results
def _analyze_dependencies(self, tool_calls: list[dict]) -> tuple[list, list]: """分析工具调用的依赖关系""" independent = [] dependent = []
for tc in tool_calls: # 简单启发式:如果参数引用了其他调用的结果,则依赖 params_str = json.dumps(tc.get("params", {})) if any(f"${{result.{other_id}}}" in params_str for other_id in range(len(tool_calls))): dependent.append(tc) else: independent.append(tc)
return independent, dependent六、成本监控
6.1 实时成本仪表板
# 成本监控指标agent_cost_total{category="research"} 125.50agent_cost_per_request 0.15agent_token_usage{purpose="tool_result"} 45%agent_cache_hit_rate 0.726.2 成本告警
@dataclassclass CostAlert: threshold_usd: float = 1.0 window_minutes: int = 60
async def check_cost_alert(cost_breakdown: CostBreakdown): if cost_breakdown.total_usd > alert.threshold_usd: await send_alert( f"Agent 成本超限: ${cost_breakdown.total_usd}" )6.3 成本优化 Dashboard
class CostOptimizationDashboard: """成本优化仪表板"""
def generate_report(self, period: str = "daily") -> dict: return { "period": period, "total_cost": self._get_total_cost(period), "cost_by_agent": self._get_cost_by_agent(period), "cost_by_model": self._get_cost_by_model(period), "savings": { "cache_savings": self._get_cache_savings(period), "routing_savings": self._get_routing_savings(period), "compression_savings": self._get_compression_savings(period), }, "recommendations": self._generate_recommendations(), }
def _generate_recommendations(self) -> list[str]: """生成成本优化建议""" recs = []
# 检查是否有简单任务使用了昂贵模型 expensive_for_simple = self._check_model_task_mismatch() if expensive_for_simple: recs.append( f"发现 {len(expensive_for_simple)} 个简单任务使用了昂贵模型," f"建议启用智能路由,预计节省 ${expensive_for_simple['potential_savings']:.2f}/月" )
# 检查缓存命中率 cache_hit_rate = self._get_cache_hit_rate() if cache_hit_rate < 0.3: recs.append( f"缓存命中率仅 {cache_hit_rate:.1%},建议优化缓存策略" )
# 检查 Prompt 冗余 avg_prompt_tokens = self._get_avg_prompt_tokens() if avg_prompt_tokens > 2000: recs.append( f"平均 Prompt 长度 {avg_prompt_tokens} tokens,建议启用 Prompt 压缩" )
return recs七、不同规模下的成本估算
7.1 小规模(个人项目/原型)
日均请求: 100 次平均每次: 3 次 LLM 调用, 约 2000 tokens模型: GPT-4o-mini ($0.15/$0.60 per 1M tokens)
月度估算:- Token 消耗: 100 * 3 * 2000 * 30 = 18M tokens/月- 输入成本: ~$2.70/月- 输出成本: ~$1.08/月- 总计: ~$5/月7.2 中等规模(SaaS 产品)
日均请求: 10,000 次优化前: GPT-4o 全量, 约 5000 tokens/次优化后: 70% GPT-4o-mini + 30% GPT-4o, 约 3000 tokens/次
月度估算(优化前):- 约 $4,500/月
月度估算(优化后):- 约 $1,200/月- 节省: 73%7.3 大规模(企业生产)
日均请求: 1,000,000 次策略: 智能路由 + 语义缓存 + 批量处理 + 本地模型兜底
优化措施效果:- 语义缓存: 命中率 40%, 节省 40%- 智能路由: 60% 走便宜模型, 节省 45%- Prompt 压缩: 减少 30% Token- 批量 API: 减少 50% API 调用成本- 本地模型: 20% 流量走本地, 成本接近 0
综合优化: 约 85% 成本节省八、总结
| 优化策略 | 节省比例 |
|---|---|
| Prompt 压缩 | 20-40% |
| 结果缓存 | 50-70% |
| 小模型路由 | 30-60% |
| 历史压缩 | 15-30% |
| 批量处理 | 30-50% |
| 并行执行 | 降低延迟 |
综合可节省 50%+ 成本!
8.1 成本优化实施路线图
flowchart LR
A["第 1 周<br/>接入 Token 监控<br/>了解成本分布"] --> B["第 2 周<br/>实现 Prompt 压缩<br/>+ 工具结果缓存"]
B --> C["第 3 周<br/>实现智能路由<br/>+ 语义缓存"]
C --> D["第 4 周<br/>批量处理优化<br/>+ 并行化"]
D --> E["持续优化<br/>本地模型兜底<br/>+ 成本告警"]
参考资料
支持与分享
如果这篇文章对你有帮助,欢迎支持作者或分享给更多人
Agent 成本优化:Token 节省与缓存策略
https://blog.souloss.com/posts/machine-learning/agent-guide/agent-cost-optimization/ 部分信息可能已经过时
相关文章 智能推荐
1
Agent 可靠性:重试、熔断与降级
AI 深度解读 Agent 可靠性设计——重试机制、熔断器、降级策略、限流
2
Agent 测试策略:从单元到集成
AI 深度解读 Agent 测试——单元测试、集成测试、LLM-as-judge、模糊测试
3
Agent 架构模式:Handoffs、Fan-out 与 Supervisor
AI 深度解读 Agent 架构模式——Handoffs 交接模式、Fan-out/Fan-in 并行模式、Supervisor 监督模式
4
Agent 可观测性:日志、追踪与调试
AI 深度解读 Agent 可观测性——Langfuse、OpenTelemetry 追踪、LangSmith 等工具
5
从Chatbot到Agent:打造能自主干活的AI
AI 从Chatbot到Agent——打造能自主干活的AI






