mobile wallpaper 1mobile wallpaper 2mobile wallpaper 3mobile wallpaper 4
924 字
3 分钟
Agent 成本优化:Token 节省与缓存策略
2025-04-10

前言#

Agent 的成本主要来自 Token 消耗。每次工具调用都在消耗 Token。本章讲解如何有效控制成本,从 Token 优化、模型路由、语义缓存到批量处理,全方位降低 Agent 运行费用。

一、成本结构分析#

1.1 Token 成本构成#

@dataclass
class CostBreakdown:
system_prompt: int # 系统提示词(只算一次)
conversation_history: int # 对话历史
tool_result: int # 工具返回结果
final_response: int # 最终响应
# 估算成本
def estimate_cost(breakdown: CostBreakdown) -> float:
return (
breakdown.system_prompt * 3.0 + # $3/1M tokens
breakdown.conversation_history * 3.0 +
breakdown.tool_result * 0.6 + # $0.6/1M
breakdown.final_response * 3.0
) / 1_000_000

1.2 成本热点#

热点占比优化空间
系统提示词~30%Prompt 压缩
工具返回~40%结果缓存
对话历史~20%摘要压缩
最终响应~10%限制长度

1.3 Agent 成本的特殊性#

与传统 API 调用不同,Agent 的单次请求成本差异巨大:

简单问答: 1 次 LLM 调用 → 约 500 tokens → $0.001
工具调用: 3 次 LLM 调用 → 约 3000 tokens → $0.006
深度研究: 10 次 LLM 调用 → 约 15000 tokens → $0.03
多 Agent 协作: 20 次 LLM 调用 → 约 50000 tokens → $0.10

成本随任务复杂度非线性增长。一次深度研究任务的成本可能是简单问答的 100 倍。

1.4 各模型价格对比#

模型输入价格 ($/1M tokens)输出价格 ($/1M tokens)性能等级
GPT-4o2.5010.00
GPT-4o-mini0.150.60
Claude Sonnet 43.0015.00
Claude Haiku 3.50.804.00
DeepSeek Chat0.140.28
DeepSeek Reasoner0.552.19
Gemini 2.0 Flash0.100.40
Gemini 2.5 Pro1.2510.00
本地 Llama 3.3 70B0 (电费)0 (电费)

二、Prompt 压缩#

2.1 系统提示词精简#

# 精简前
system_prompt_old = """
你是一个专业的研究助手。
你擅长信息检索、总结和分析。
你有以下工具可以使用:
1. search - 搜索网络
2. scrape - 抓取网页
3. calculator - 计算
请基于事实回答,不要编造信息。
"""
# 精简后
system_prompt_new = """
你是一个研究助手。使用 search/scrape/calculator。
基于事实回答。
"""

2.2 对话历史压缩#

def compress_history(messages: list, max_tokens: int = 2000) -> list:
"""对话历史压缩"""
if count_tokens(messages) <= max_tokens:
return messages
# 保留系统 + 最近 N 轮
system = messages[0] # 系统消息
recent = messages[-5:] # 最近5轮
# 对旧消息摘要
old_messages = messages[1:-5]
summary = summarize_conversation(old_messages)
return [system, summary] + recent

2.3 自动化 Prompt 压缩#

使用 DSPy 或类似框架自动压缩 Prompt,在保持效果的同时减少 Token:

class PromptCompressor:
"""Prompt 自动压缩器"""
async def compress(self, prompt: str, compression_ratio: float = 0.5) -> str:
"""将 Prompt 压缩到指定比例"""
target_tokens = int(count_tokens(prompt) * compression_ratio)
compressed = await llm.complete(f"""请将以下 Prompt 压缩到约 {target_tokens} tokens,
保持核心指令不变,去除冗余描述。
原始 Prompt:
{prompt}
压缩后 Prompt:""")
return compressed.strip()
async def compress_tool_descriptions(self, tools: list[dict]) -> list[dict]:
"""压缩工具描述"""
compressed = []
for tool in tools:
short_desc = await self.compress(tool["description"], compression_ratio=0.4)
compressed.append({
"name": tool["name"],
"description": short_desc,
"parameters": self._shorten_params(tool["parameters"]),
})
return compressed
def _shorten_params(self, params: dict) -> dict:
"""精简参数定义"""
required = params.get("required", [])
properties = {}
for name, schema in params.get("properties", {}).items():
if name in required:
# 必填参数保留类型和简短描述
properties[name] = {"type": schema.get("type", "string")}
# 可选参数:根据需要保留或移除
return {"type": "object", "required": required, "properties": properties}

2.4 动态 Prompt 选择#

根据查询复杂度动态选择详细的还是精简的 Prompt:

class DynamicPromptSelector:
"""动态 Prompt 选择器"""
def __init__(self):
self.prompts = {
"full": FULL_SYSTEM_PROMPT, # 约 800 tokens
"compact": COMPACT_SYSTEM_PROMPT, # 约 200 tokens
"minimal": MINIMAL_SYSTEM_PROMPT, # 约 50 tokens
}
def select(self, query: str, context: dict) -> str:
complexity = self._estimate_complexity(query, context)
if complexity < 0.3:
return self.prompts["minimal"]
elif complexity < 0.7:
return self.prompts["compact"]
else:
return self.prompts["full"]
def _estimate_complexity(self, query: str, context: dict) -> float:
"""评估查询复杂度 (0-1)"""
score = 0.0
# 长查询更复杂
if len(query) > 200:
score += 0.2
# 需要多步骤的任务更复杂
if any(kw in query for kw in ["分析", "比较", "总结", "研究"]):
score += 0.3
# 需要工具调用的任务更复杂
if context.get("tools_available"):
score += 0.2
# 历史对话长更复杂
if context.get("history_length", 0) > 10:
score += 0.2
return min(score, 1.0)

三、结果缓存#

3.1 工具结果缓存#

from cachetools import cached
import hashlib
@cached(cache={}, key=lambda query: hashlib.md5(query.encode()).hexdigest())
async def cached_search(query: str) -> str:
"""搜索结果缓存"""
return await search(query)

3.2 Embedding 缓存#

from redis import Redis
redis_client = Redis()
def cache_embedding(text: str, embedding: list[float]):
key = f"embedding:{hash(text)}"
redis_client.setex(key, ttl=86400, value=json.dumps(embedding))
def get_cached_embedding(text: str) -> Optional[list[float]]:
key = f"embedding:{hash(text)}"
return redis_client.get(key)

3.3 语义缓存#

语义缓存不是精确匹配,而是根据语义相似度返回缓存结果。这是 Agent 场景下最有效的缓存策略:

flowchart TD A["用户查询"] --> B{"语义缓存命中?<br/>相似度 > 阈值"} B -->|"命中"| C["返回缓存结果<br/>成本: $0"] B -->|"未命中"| D["调用 LLM<br/>成本: $0.01"] D --> E["存入缓存"] E --> F["返回结果"]
import numpy as np
class SemanticCache:
"""语义缓存:基于向量相似度的缓存"""
def __init__(
self,
embedding_model,
similarity_threshold: float = 0.92,
ttl_seconds: int = 3600,
):
self.embedding_model = embedding_model
self.similarity_threshold = similarity_threshold
self.ttl = ttl_seconds
self.cache_store = {} # 生产环境用 Redis + 向量数据库
async def get(self, query: str) -> tuple[str | None, float]:
"""查询缓存,返回 (缓存结果, 相似度)"""
query_embedding = await self.embedding_model.embed(query)
best_match = None
best_similarity = 0.0
for key, entry in self.cache_store.items():
# 检查 TTL
if self._is_expired(entry):
continue
similarity = cosine_similarity(query_embedding, entry["embedding"])
if similarity > best_similarity:
best_similarity = similarity
best_match = entry["result"]
if best_similarity >= self.similarity_threshold:
return best_match, best_similarity
return None, best_similarity
async def set(self, query: str, result: str):
"""存入缓存"""
embedding = await self.embedding_model.embed(query)
self.cache_store[hashlib.md5(query.encode()).hexdigest()] = {
"query": query,
"result": result,
"embedding": embedding,
"timestamp": time.time(),
}
def _is_expired(self, entry: dict) -> bool:
return time.time() - entry["timestamp"] > self.ttl
def cosine_similarity(a: list[float], b: list[float]) -> float:
a_np = np.array(a)
b_np = np.array(b)
return float(np.dot(a_np, b_np) / (np.linalg.norm(a_np) * np.linalg.norm(b_np)))

3.4 多级缓存架构#

生产环境需要多级缓存,从内存到 Redis 再到向量数据库:

class MultiLevelCache:
"""多级缓存"""
def __init__(self):
self.l1 = LRUCache(maxsize=100) # 内存缓存:最快,容量小
self.l2 = RedisCache(ttl=3600) # Redis:精确匹配
self.l3 = VectorDBCache(threshold=0.9) # 向量库:语义匹配
async def get(self, query: str) -> str | None:
# L1: 内存精确匹配
result = self.l1.get(query)
if result:
return result
# L2: Redis 精确匹配
result = await self.l2.get(query)
if result:
self.l1.set(query, result) # 回填 L1
return result
# L3: 向量库语义匹配
result, similarity = await self.l3.search(query)
if result and similarity > self.l3.threshold:
self.l1.set(query, result) # 回填
await self.l2.set(query, result)
return result
return None
async def set(self, query: str, result: str):
"""写入所有缓存层"""
self.l1.set(query, result)
await self.l2.set(query, result)
await self.l3.add(query, result)

3.5 缓存效果预估#

缓存类型命中率节省成本适用场景
精确匹配10-20%10-20%FAQ、重复查询
语义缓存30-50%30-50%客服、知识问答
工具结果缓存40-60%20-40%搜索、数据查询
多级组合50-70%50-70%高流量生产环境

四、智能路由#

4.1 简单 vs 复杂路由#

async def route(query: str) -> str:
"""根据查询复杂度选择处理路径"""
complexity = estimate_complexity(query)
if complexity == "simple":
return await simple_rag(query) # 便宜快速
else:
return await deep_research(query) # 昂贵但深入

4.2 小模型路由#

async def cheap_first(query: str) -> str:
"""先用小模型尝试,失败再调用大模型"""
try:
# 先用便宜模型
result = await llm.mini.complete(query)
if is_satisfactory(result):
return result
except QualityCheckFailed:
pass
# 回退到大模型
return await llm.pro.complete(query)

4.3 完整的智能路由系统#

智能路由是成本优化最有效的手段之一。核心思想:简单任务用便宜模型,复杂任务用贵模型。

from enum import Enum
class ModelTier(Enum):
CHEAP = "cheap" # GPT-4o-mini, Haiku, Flash
STANDARD = "standard" # GPT-4o, Sonnet
PREMIUM = "premium" # Claude Opus, o1
class ModelRouter:
"""智能模型路由器"""
TIER_MODELS = {
ModelTier.CHEAP: ["gpt-4o-mini", "claude-haiku-3.5", "gemini-2.0-flash"],
ModelTier.STANDARD: ["gpt-4o", "claude-sonnet-4", "deepseek-chat"],
ModelTier.PREMIUM: ["claude-opus-4", "o1", "deepseek-reasoner"],
}
def __init__(self):
self.routing_rules = self._load_rules()
self.fallback_enabled = True
async def route(self, query: str, context: dict | None = None) -> tuple[str, ModelTier]:
"""为查询选择最合适的模型"""
complexity = await self._classify_complexity(query, context)
if complexity.score < 0.3:
tier = ModelTier.CHEAP
elif complexity.score < 0.7:
tier = ModelTier.STANDARD
else:
tier = ModelTier.PREMIUM
# 从对应层级中选择模型
model = self._select_from_tier(tier, complexity)
return model, tier
async def _classify_complexity(self, query: str, context: dict | None) -> dict:
"""分类查询复杂度"""
score = 0.0
signals = []
# 信号 1: 查询长度
if len(query) > 500:
score += 0.2
signals.append("长查询")
elif len(query) < 50:
score -= 0.1
signals.append("短查询")
# 信号 2: 是否需要推理
reasoning_keywords = ["分析", "比较", "为什么", "如何", "evaluate", "compare", "why"]
if any(kw in query.lower() for kw in reasoning_keywords):
score += 0.25
signals.append("需要推理")
# 信号 3: 是否需要工具调用
tool_keywords = ["搜索", "查询", "计算", "查找", "search", "calculate"]
if any(kw in query.lower() for kw in tool_keywords):
score += 0.15
signals.append("需要工具")
# 信号 4: 是否涉及代码
code_indicators = ["代码", "函数", "bug", "code", "function", "debug"]
if any(kw in query.lower() for kw in code_indicators):
score += 0.2
signals.append("涉及代码")
# 信号 5: 历史对话轮次
if context and context.get("turn_count", 0) > 5:
score += 0.1
signals.append("多轮对话")
return {
"score": max(0, min(1, score)),
"signals": signals,
}
def _select_from_tier(self, tier: ModelTier, complexity: dict) -> str:
"""从指定层级选择具体模型"""
models = self.TIER_MODELS[tier]
# 简单的轮询策略,生产环境可加入更多考量
return models[0]
class CascadingRouter:
"""级联回退路由器:先用便宜模型,不行再升级"""
def __init__(self, router: ModelRouter):
self.router = router
self.quality_threshold = 0.7
async def route_with_fallback(self, query: str) -> str:
"""带回退的路由"""
# 先尝试便宜模型
model, tier = self.router.route(query, context=None)
if tier == ModelTier.CHEAP:
result = await self._call_model(model, query)
quality = await self._assess_quality(query, result)
if quality >= self.quality_threshold:
return result
# 便宜模型不行,升级到标准模型
model = self.router.TIER_MODELS[ModelTier.STANDARD][0]
result = await self._call_model(model, query)
# 如果标准模型也不行(不太常见),再升级
quality = await self._assess_quality(query, result)
if quality < self.quality_threshold and model not in self.router.TIER_MODELS[ModelTier.PREMIUM]:
premium_model = self.router.TIER_MODELS[ModelTier.PREMIUM][0]
result = await self._call_model(premium_model, query)
return result
async def _call_model(self, model: str, query: str) -> str:
return await llm.complete(query, model=model)
async def _assess_quality(self, query: str, response: str) -> float:
"""快速质量评估(用便宜模型)"""
assessment = await llm.complete(
f"评估以下回答是否充分回答了问题。只输出 0-1 的分数。\n\n"
f"问题: {query}\n回答: {response[:500]}\n\n分数:",
model="gpt-4o-mini", # 用便宜模型做质量检查
)
try:
return float(assessment.strip())
except ValueError:
return 0.5 # 解析失败,默认中等质量

4.4 路由效果预估#

策略成本节省质量影响实现复杂度
固定便宜模型70-80%质量下降
简单/复杂路由40-60%几乎无
级联回退30-50%
LLM 路由(小模型分类)35-55%

五、批量处理与并行化#

5.1 批量请求合并#

将多个独立的 LLM 调用合并为一个批量请求,减少 API 调用开销:

import asyncio
from collections import defaultdict
class BatchProcessor:
"""LLM 批量请求处理器"""
def __init__(self, max_batch_size: int = 10, max_wait_seconds: float = 0.5):
self.max_batch_size = max_batch_size
self.max_wait = max_wait_seconds
self.queue: list[tuple[str, asyncio.Future]] = []
self.lock = asyncio.Lock()
async def submit(self, prompt: str) -> str:
"""提交单个请求,等待批量处理"""
loop = asyncio.get_event_loop()
future = loop.create_future()
async with self.lock:
self.queue.append((prompt, future))
# 队列满了就立即处理
if len(self.queue) >= self.max_batch_size:
await self._flush()
return await future
async def _flush(self):
"""批量发送"""
batch = self.queue[:self.max_batch_size]
self.queue = self.queue[self.max_batch_size:]
prompts = [p for p, _ in batch]
futures = [f for _, f in batch]
try:
# 使用批量 API(如 OpenAI Batch API)
results = await batch_llm_call(prompts)
for future, result in zip(futures, results):
if not future.done():
future.set_result(result)
except Exception as e:
for future in futures:
if not future.done():
future.set_exception(e)
async def start(self):
"""启动定时刷新"""
while True:
await asyncio.sleep(self.max_wait)
async with self.lock:
if self.queue:
await self._flush()

5.2 OpenAI Batch API 使用#

OpenAI 提供了专门的 Batch API,比实时调用便宜 50%:

import json
import time
class OpenAIBatchAPI:
"""OpenAI Batch API 封装"""
def __init__(self, client):
self.client = client
async def submit_batch(self, tasks: list[dict]) -> str:
"""提交批量任务"""
# 创建批量输入文件
batch_lines = []
for i, task in enumerate(tasks):
batch_lines.append(json.dumps({
"custom_id": f"task-{i}",
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": task.get("model", "gpt-4o-mini"),
"messages": task["messages"],
"max_tokens": task.get("max_tokens", 1000),
}
}))
# 上传文件
batch_file = self.client.files.create(
file="\n".join(batch_lines),
purpose="batch",
)
# 创建批量任务
batch = self.client.batches.create(
input_file_id=batch_file.id,
endpoint="/v1/chat/completions",
completion_window="24h",
)
return batch.id
async def wait_and_get_results(self, batch_id: str) -> list[dict]:
"""等待批量任务完成并获取结果"""
while True:
batch = self.client.batches.retrieve(batch_id)
if batch.status == "completed":
# 下载结果文件
results = self.client.files.content(batch.output_file_id)
return [json.loads(line) for line in results.text.strip().split("\n")]
elif batch.status == "failed":
raise Exception(f"Batch failed: {batch.errors}")
elif batch.status in ("expired", "cancelled"):
raise Exception(f"Batch {batch.status}")
time.sleep(30) # 每 30 秒检查一次

5.3 并行工具调用#

Agent 在需要调用多个独立工具时,应该并行执行而非串行:

class ParallelToolExecutor:
"""并行工具执行器"""
async def execute_tools(self, tool_calls: list[dict]) -> list[dict]:
"""并行执行独立的工具调用"""
# 分析依赖关系
independent, dependent = self._analyze_dependencies(tool_calls)
# 并行执行独立调用
results = {}
if independent:
tasks = [
self._execute_single(tc)
for tc in independent
]
parallel_results = await asyncio.gather(*tasks, return_exceptions=True)
for tc, result in zip(independent, parallel_results):
results[tc["id"]] = result
# 串行执行有依赖的调用
for tc in dependent:
# 注入之前的结果作为上下文
tc_with_context = self._inject_context(tc, results)
result = await self._execute_single(tc_with_context)
results[tc["id"]] = result
return results
def _analyze_dependencies(self, tool_calls: list[dict]) -> tuple[list, list]:
"""分析工具调用的依赖关系"""
independent = []
dependent = []
for tc in tool_calls:
# 简单启发式:如果参数引用了其他调用的结果,则依赖
params_str = json.dumps(tc.get("params", {}))
if any(f"${{result.{other_id}}}" in params_str for other_id in range(len(tool_calls))):
dependent.append(tc)
else:
independent.append(tc)
return independent, dependent

六、成本监控#

6.1 实时成本仪表板#

# 成本监控指标
agent_cost_total{category="research"} 125.50
agent_cost_per_request 0.15
agent_token_usage{purpose="tool_result"} 45%
agent_cache_hit_rate 0.72

6.2 成本告警#

@dataclass
class CostAlert:
threshold_usd: float = 1.0
window_minutes: int = 60
async def check_cost_alert(cost_breakdown: CostBreakdown):
if cost_breakdown.total_usd > alert.threshold_usd:
await send_alert(
f"Agent 成本超限: ${cost_breakdown.total_usd}"
)

6.3 成本优化 Dashboard#

class CostOptimizationDashboard:
"""成本优化仪表板"""
def generate_report(self, period: str = "daily") -> dict:
return {
"period": period,
"total_cost": self._get_total_cost(period),
"cost_by_agent": self._get_cost_by_agent(period),
"cost_by_model": self._get_cost_by_model(period),
"savings": {
"cache_savings": self._get_cache_savings(period),
"routing_savings": self._get_routing_savings(period),
"compression_savings": self._get_compression_savings(period),
},
"recommendations": self._generate_recommendations(),
}
def _generate_recommendations(self) -> list[str]:
"""生成成本优化建议"""
recs = []
# 检查是否有简单任务使用了昂贵模型
expensive_for_simple = self._check_model_task_mismatch()
if expensive_for_simple:
recs.append(
f"发现 {len(expensive_for_simple)} 个简单任务使用了昂贵模型,"
f"建议启用智能路由,预计节省 ${expensive_for_simple['potential_savings']:.2f}/月"
)
# 检查缓存命中率
cache_hit_rate = self._get_cache_hit_rate()
if cache_hit_rate < 0.3:
recs.append(
f"缓存命中率仅 {cache_hit_rate:.1%},建议优化缓存策略"
)
# 检查 Prompt 冗余
avg_prompt_tokens = self._get_avg_prompt_tokens()
if avg_prompt_tokens > 2000:
recs.append(
f"平均 Prompt 长度 {avg_prompt_tokens} tokens,建议启用 Prompt 压缩"
)
return recs

七、不同规模下的成本估算#

7.1 小规模(个人项目/原型)#

日均请求: 100 次
平均每次: 3 次 LLM 调用, 约 2000 tokens
模型: GPT-4o-mini ($0.15/$0.60 per 1M tokens)
月度估算:
- Token 消耗: 100 * 3 * 2000 * 30 = 18M tokens/月
- 输入成本: ~$2.70/月
- 输出成本: ~$1.08/月
- 总计: ~$5/月

7.2 中等规模(SaaS 产品)#

日均请求: 10,000 次
优化前: GPT-4o 全量, 约 5000 tokens/次
优化后: 70% GPT-4o-mini + 30% GPT-4o, 约 3000 tokens/次
月度估算(优化前):
- 约 $4,500/月
月度估算(优化后):
- 约 $1,200/月
- 节省: 73%

7.3 大规模(企业生产)#

日均请求: 1,000,000 次
策略: 智能路由 + 语义缓存 + 批量处理 + 本地模型兜底
优化措施效果:
- 语义缓存: 命中率 40%, 节省 40%
- 智能路由: 60% 走便宜模型, 节省 45%
- Prompt 压缩: 减少 30% Token
- 批量 API: 减少 50% API 调用成本
- 本地模型: 20% 流量走本地, 成本接近 0
综合优化: 约 85% 成本节省

八、总结#

优化策略节省比例
Prompt 压缩20-40%
结果缓存50-70%
小模型路由30-60%
历史压缩15-30%
批量处理30-50%
并行执行降低延迟

综合可节省 50%+ 成本!

8.1 成本优化实施路线图#

flowchart LR A["第 1 周<br/>接入 Token 监控<br/>了解成本分布"] --> B["第 2 周<br/>实现 Prompt 压缩<br/>+ 工具结果缓存"] B --> C["第 3 周<br/>实现智能路由<br/>+ 语义缓存"] C --> D["第 4 周<br/>批量处理优化<br/>+ 并行化"] D --> E["持续优化<br/>本地模型兜底<br/>+ 成本告警"]

参考资料#

支持与分享

如果这篇文章对你有帮助,欢迎支持作者或分享给更多人

Agent 成本优化:Token 节省与缓存策略
https://blog.souloss.com/posts/machine-learning/agent-guide/agent-cost-optimization/
作者
Souloss
发布于
2025-04-10
许可协议
CC BY-NC-SA 4.0

部分信息可能已经过时