mobile wallpaper 1mobile wallpaper 2mobile wallpaper 3mobile wallpaper 4
1025 字
3 分钟
Agent 可靠性:重试、熔断与降级
2025-04-21

前言#

Agent 依赖外部 API,存在失败风险。本章讲解构建可靠 Agent 系统的关键模式,包括错误处理、重试策略、输入输出验证、降级模式和对抗性鲁棒性。

一、失败类型#

1.1 Agent 常见失败#

失败类型频率影响
API 超时
速率限制
工具执行失败
上下文超限
幻觉严重极高

1.2 失败处理策略#

from enum import Enum
class FailureType(Enum):
RETRYABLE = "retry"
DEGRADABLE = "degrade"
FATAL = "fatal"
FAILURE_STRATEGIES = {
FailureType.RETRYABLE: retry_with_backoff,
FailureType.DEGRADABLE: degrade_to_simple,
FailureType.FATAL: return_error_to_user
}

1.3 Agent 特有的失败模式#

与传统微服务不同,Agent 有一些独特的失败模式:

失败模式表现影响
推理循环Agent 反复执行相同操作Token 消耗飙升
工具调用幻觉调用不存在的工具或编造参数任务失败
上下文窗口溢出对话历史超出 Token 限制API 报错
格式解析失败LLM 输出不符合预期格式后续步骤出错
多 Agent 死锁两个 Agent 互相等待对方结果请求超时
级联幻觉前一步的错误被后续步骤放大最终输出严重失真

二、重试机制#

2.1 指数退避#

import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10)
)
async def call_with_retry(tool_func, *args, **kwargs):
try:
return await tool_func(*args, **kwargs)
except RateLimitError:
raise RetryableError("Rate limited")

2.2 熔断器#

from circuitbreaker import circuit
@circuit(maximum=10, failure_threshold=5, recovery_timeout=60)
async def protected_call():
return await vulnerable_api_call()

2.3 LLM 调用的智能重试#

LLM 调用失败不同于普通 API 调用。有些失败可以重试,有些需要调整参数:

from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
class LLMAPIError(Exception):
def __init__(self, error_type: str, message: str):
self.error_type = error_type
super().__init__(message)
class ContextLengthExceeded(LLMAPIError):
def __init__(self, requested: int, limit: int):
super().__init__("context_length_exceeded", f"需要 {requested} tokens,上限 {limit}")
self.requested_tokens = requested
self.max_tokens = limit
class RateLimitExceeded(LLMAPIError):
def __init__(self, retry_after: float):
super().__init__("rate_limit", "请求过快")
self.retry_after = retry_after
class SmartLLMRetry:
"""LLM 智能重试:根据错误类型采取不同策略"""
def __init__(self, max_retries: int = 3):
self.max_retries = max_retries
async def call(self, prompt: str, model: str = "gpt-4o", **kwargs) -> str:
"""带智能重试的 LLM 调用"""
current_prompt = prompt
current_model = model
for attempt in range(self.max_retries):
try:
return await self._do_call(current_prompt, current_model, **kwargs)
except ContextLengthExceeded as e:
# 策略 1: 截断 Prompt
if attempt == 0:
current_prompt = self._truncate_prompt(current_prompt, int(e.max_tokens * 0.8))
continue
# 策略 2: 换用更大上下文的模型
if attempt == 1:
current_model = self._get_larger_context_model(current_model)
continue
# 策略 3: 压缩历史
current_prompt = await self._compress_prompt(current_prompt)
continue
except RateLimitExceeded as e:
# 等待后重试
await asyncio.sleep(e.retry_after)
continue
except LLMAPIError as e:
if e.error_type in ("invalid_request", "authentication"):
# 不可重试的错误,立即抛出
raise
# 其他错误正常重试
await asyncio.sleep(2 ** attempt)
continue
raise Exception(f"LLM 调用在 {self.max_retries} 次重试后仍然失败")
async def _do_call(self, prompt: str, model: str, **kwargs) -> str:
"""执行 LLM 调用"""
try:
return await llm.complete(prompt, model=model, **kwargs)
except Exception as e:
error = self._parse_error(e)
raise error from e
def _truncate_prompt(self, prompt: str, target_tokens: int) -> str:
"""截断 Prompt 到目标 Token 数"""
current_tokens = count_tokens(prompt)
if current_tokens <= target_tokens:
return prompt
ratio = target_tokens / current_tokens
cut_point = int(len(prompt) * ratio)
return prompt[:cut_point] + "\n\n[内容已截断]"
def _get_larger_context_model(self, current: str) -> str:
"""获取上下文更大的替代模型"""
upgrades = {
"gpt-4o-mini": "gpt-4o",
"claude-haiku-3.5": "claude-sonnet-4",
"gemini-2.0-flash": "gemini-2.5-pro",
}
return upgrades.get(current, current)

2.4 重试策略对比#

策略适用场景优点缺点
固定间隔重试简单瞬时错误实现简单可能加剧限流
指数退避速率限制、服务端错误自适应总等待时间较长
抖动退避高并发场景避免重试风暴实现稍复杂
智能重试LLM 特有错误针对性强需要错误分类
# 抖动退避实现
import random
async def retry_with_jitter(
func,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0,
):
"""带抖动的指数退避"""
for attempt in range(max_retries):
try:
return await func()
except Exception as e:
if attempt == max_retries - 1:
raise
# 指数退避 + 随机抖动
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay * 0.5)
await asyncio.sleep(delay + jitter)

三、降级策略#

3.1 多级降级#

async def degrade_to_simple(query: str) -> str:
"""降级到简单模式"""
# 第一级:简单 RAG
try:
return await simple_rag(query)
except:
pass
# 第二级:关键词匹配
try:
return await keyword_search(query)
except:
pass
# 第三级:返回预设答案
return "抱歉,暂时无法回答您的问题。"

3.2 功能降级#

graph TD A["用户请求"] --> B{"功能正常?"} B -->|"是"| C["完整 Agent"] B -->|"否"| D{"工具可用?"} D -->|"是"| E["简化版 Agent"] D -->|"否"| F["FAQ 机器人"]

3.3 结构化降级框架#

生产环境的降级需要根据依赖状态动态调整,而非简单的 try-except 嵌套:

from dataclasses import dataclass
from enum import Enum
class ServiceLevel(Enum):
FULL = "full" # 完整功能
DEGRADED = "degraded" # 部分功能
MINIMAL = "minimal" # 最小功能
OFFLINE = "offline" # 仅返回静态内容
@dataclass
class ServiceHealth:
llm_available: bool = True
search_available: bool = True
database_available: bool = True
cache_available: bool = True
class GracefulDegradation:
"""优雅降级框架"""
def __init__(self):
self.health = ServiceHealth()
self.degradation_rules = {
# (缺失的服务, 降级策略)
"no_search": {
"level": ServiceLevel.DEGRADED,
"fallback": "knowledge_base_only",
"message": "搜索功能暂不可用,基于知识库回答",
},
"no_llm": {
"level": ServiceLevel.MINIMAL,
"fallback": "template_responses",
"message": "AI 服务暂不可用,返回预设回答",
},
"no_database": {
"level": ServiceLevel.DEGRADED,
"fallback": "llm_knowledge_only",
"message": "数据库暂不可用,基于模型知识回答",
},
}
def determine_level(self) -> tuple[ServiceLevel, list[str]]:
"""根据健康状况确定服务级别"""
missing = []
if not self.health.llm_available:
missing.append("no_llm")
if not self.health.search_available:
missing.append("no_search")
if not self.health.database_available:
missing.append("no_database")
if not missing:
return ServiceLevel.FULL, []
# 取最严重的降级级别
levels = [self.degradation_rules[m]["level"] for m in missing]
severity = {ServiceLevel.FULL: 0, ServiceLevel.DEGRADED: 1, ServiceLevel.MINIMAL: 2}
worst = max(levels, key=lambda l: severity[l])
return worst, missing
async def handle_request(self, query: str) -> dict:
"""根据当前服务级别处理请求"""
level, missing = self.determine_level()
if level == ServiceLevel.FULL:
result = await self._full_service(query)
elif level == ServiceLevel.DEGRADED:
result = await self._degraded_service(query, missing)
elif level == ServiceLevel.MINIMAL:
result = await self._minimal_service(query)
else:
result = self._offline_response()
result["service_level"] = level.value
return result
async def _full_service(self, query: str) -> dict:
"""完整服务"""
response = await agent.run(query)
return {"response": response, "quality": "high"}
async def _degraded_service(self, query: str, missing: list[str]) -> dict:
"""降级服务"""
messages = [self.degradation_rules[m]["message"] for m in missing]
if "no_search" in missing:
# 搜索不可用,用知识库
response = await knowledge_base_rag(query)
elif "no_database" in missing:
# 数据库不可用,靠 LLM 知识
response = await llm.complete(query)
else:
response = await agent.run(query)
return {"response": response, "quality": "medium", "warnings": messages}
async def _minimal_service(self, query: str) -> dict:
"""最小服务"""
# 模板匹配或 FAQ
response = match_faq(query) or "服务暂时受限,请稍后再试。"
return {"response": response, "quality": "low"}
def _offline_response(self) -> dict:
return {"response": "系统维护中,请稍后再试。", "quality": "none"}

四、输入验证与输出解析#

4.1 输入验证#

Agent 的输入来自用户,可能包含格式错误、恶意内容或超出处理能力的内容:

import re
from dataclasses import dataclass
@dataclass
class ValidationResult:
is_valid: bool
errors: list[str]
warnings: list[str]
sanitized_input: str | None = None
class InputValidator:
"""Agent 输入验证器"""
MAX_INPUT_LENGTH = 10000
MAX_TOOL_PARAMS = 20
def validate(self, user_input: str) -> ValidationResult:
errors = []
warnings = []
# 1. 长度检查
if len(user_input) > self.MAX_INPUT_LENGTH:
errors.append(f"输入过长: {len(user_input)} > {self.MAX_INPUT_LENGTH}")
elif len(user_input) > self.MAX_INPUT_LENGTH * 0.8:
warnings.append("输入接近长度上限,可能影响处理效果")
# 2. 空输入检查
if not user_input.strip():
errors.append("输入为空")
# 3. 注入检测
injection_score = self._detect_injection(user_input)
if injection_score > 0.8:
errors.append("输入包含疑似注入内容")
elif injection_score > 0.5:
warnings.append("输入包含可疑内容")
# 4. 编码检查
if not self._is_valid_encoding(user_input):
errors.append("输入包含无效字符")
# 5. 语言检查(可选)
if self._contains_mixed_scripts(user_input):
warnings.append("输入包含混合文字,可能影响理解")
sanitized = self._sanitize(user_input) if not errors else None
return ValidationResult(
is_valid=len(errors) == 0,
errors=errors,
warnings=warnings,
sanitized_input=sanitized,
)
def _detect_injection(self, text: str) -> float:
"""检测注入攻击(返回 0-1 的风险分数)"""
injection_patterns = [
r"忽略.{0,5}(之前的|上面|所有|全部).{0,5}(指令|规则|提示)",
r"(forget|ignore|disregard).{0,10}(previous|above|all).{0,10}(instructions|rules)",
r"你是一个",
r"system:",
r"<\|im_start\|>",
r"\\n\\n",
]
matches = sum(1 for p in injection_patterns if re.search(p, text, re.IGNORECASE))
return min(matches / len(injection_patterns), 1.0)
def _is_valid_encoding(self, text: str) -> bool:
try:
text.encode("utf-8")
return True
except UnicodeEncodeError:
return False
def _contains_mixed_scripts(self, text: str) -> bool:
has_cjk = any("\u4e00" <= c <= "\u9fff" for c in text)
has_cyrillic = any("\u0400" <= c <= "\u04ff" for c in text)
has_arabic = any("\u0600" <= c <= "\u06ff" for c in text)
scripts = [has_cjk, has_cyrillic, has_arabic]
return sum(scripts) > 1
def _sanitize(self, text: str) -> str:
"""清理输入"""
# 移除控制字符
sanitized = re.sub(r"[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]", "", text)
# 规范化空白
sanitized = re.sub(r"\s+", " ", sanitized).strip()
return sanitized

4.2 输出解析可靠性#

LLM 的输出格式不稳定是 Agent 系统的常见痛点。以下是健壮的输出解析策略:

import json
import re
class RobustOutputParser:
"""健壮的 LLM 输出解析器"""
async def parse_json(self, text: str) -> dict | None:
"""从 LLM 输出中可靠地解析 JSON"""
# 策略 1: 直接解析
try:
return json.loads(text)
except json.JSONDecodeError:
pass
# 策略 2: 提取 ```json ... ``` 代码块
json_match = re.search(r"```(?:json)?\s*\n?(.*?)\n?```", text, re.DOTALL)
if json_match:
try:
return json.loads(json_match.group(1))
except json.JSONDecodeError:
pass
# 策略 3: 找到第一个 { 和最后一个 }
first_brace = text.find("{")
last_brace = text.rfind("}")
if first_brace != -1 and last_brace != -1:
try:
return json.loads(text[first_brace:last_brace + 1])
except json.JSONDecodeError:
pass
# 策略 4: 让 LLM 自修复
return await self._llm_repair_json(text)
async def _llm_repair_json(self, broken_text: str) -> dict | None:
"""用 LLM 修复损坏的 JSON"""
repair_prompt = f"""以下文本应该是一个 JSON 对象,但格式有问题。请修复并输出有效的 JSON。
原始文本:
{broken_text}
修复后的 JSON:"""
for attempt in range(2):
try:
repaired = await llm.complete(repair_prompt)
# 提取并解析
return json.loads(repaired)
except:
continue
return None
def parse_action(self, text: str) -> dict:
"""从 ReAct 格式的文本中提取 Action"""
# 匹配多种格式
patterns = [
r"Action:\s*(\w+)\s*\((.*)\)", # Action: search(query="test")
r"Action:\s*(\w+)\s*\[(.*)\]", # Action: search[query="test"]
r"```(?:json)?\s*\n?{{.*?\"action\":\s*\"(\w+)\".*?\"input\":\s*({.*?})",
]
for pattern in patterns:
match = re.search(pattern, text, re.DOTALL)
if match:
return {
"tool": match.group(1),
"input": self._parse_action_input(match.group(2)),
}
# 无法解析,返回空 Action
return {"tool": None, "input": {}}
def parse_final_answer(self, text: str) -> str:
"""提取 Final Answer"""
patterns = [
r"Final Answer:\s*(.*)",
r"最终答案[::]\s*(.*)",
r"答案[::]\s*(.*)",
]
for pattern in patterns:
match = re.search(pattern, text, re.DOTALL)
if match:
return match.group(1).strip()
# 没有明确的 Final Answer 标记,返回全文
return text.strip()

4.3 输出验证#

解析之后还需要验证输出是否合理:

class OutputValidator:
"""输出验证器"""
def validate(self, response: str, schema: dict | None = None) -> dict:
issues = []
# 1. 空响应检查
if not response or not response.strip():
issues.append({"type": "empty", "severity": "critical"})
# 2. 长度检查
if len(response) > 50000:
issues.append({"type": "too_long", "severity": "warning"})
elif len(response) < 10:
issues.append({"type": "too_short", "severity": "warning"})
# 3. 重复检查(LLM 有时会重复内容)
if self._has_excessive_repetition(response):
issues.append({"type": "repetition", "severity": "warning"})
# 4. 有害内容检查
if self._contains_harmful_content(response):
issues.append({"type": "harmful", "severity": "critical"})
# 5. Schema 验证(如果指定)
if schema:
schema_issues = self._validate_schema(response, schema)
issues.extend(schema_issues)
return {
"is_valid": not any(i["severity"] == "critical" for i in issues),
"issues": issues,
}
def _has_excessive_repetition(self, text: str) -> bool:
"""检测过度的内容重复"""
sentences = text.split("。")
if len(sentences) < 3:
return False
# 检查是否有超过 3 个相同的句子
from collections import Counter
counter = Counter(s.strip() for s in sentences if s.strip())
return any(count > 3 for count in counter.values())
def _contains_harmful_content(self, text: str) -> bool:
"""简单的有害内容检测"""
harmful_patterns = [
r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b", # 电话号码
r"\b\d{3}[-.]?\d{2}[-.]?\d{4}\b", # SSN
r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}", # 邮箱
]
return any(re.search(p, text) for p in harmful_patterns)

五、限流保护#

5.1 Token 速率限制#

import time
from collections import deque
class TokenRateLimiter:
def __init__(self, max_tokens: int, window_seconds: int):
self.max_tokens = max_tokens
self.window = window_seconds
self.requests = deque()
async def acquire(self, tokens: int):
now = time.time()
# 清理过期请求
while self.requests and now - self.requests[0] > self.window:
self.requests.popleft()
if sum(self.requests) + tokens > self.max_tokens:
wait_time = self.window - (now - self.requests[0])
await asyncio.sleep(wait_time)
self.requests.append(tokens)

5.2 并发限制#

from asyncio import Semaphore
MAX_CONCURRENT = 10
semaphore = Semaphore(MAX_CONCURRENT)
async def limited_agent_call(query: str):
async with semaphore:
return await agent.process(query)

5.3 分优先级的限流#

不同类型的请求需要不同的限流策略:

from dataclasses import dataclass
from enum import Enum
class Priority(Enum):
CRITICAL = 0 # 付费用户、关键业务
NORMAL = 1 # 普通用户
LOW = 2 # 后台任务、批量处理
@dataclass
class RateLimitConfig:
max_concurrent: int
max_rpm: int # 每分钟请求数
max_tpm: int # 每分钟 Token 数
PRIORITY_LIMITS = {
Priority.CRITICAL: RateLimitConfig(max_concurrent=20, max_rpm=120, max_tpm=200000),
Priority.NORMAL: RateLimitConfig(max_concurrent=10, max_rpm=60, max_tpm=100000),
Priority.LOW: RateLimitConfig(max_concurrent=3, max_rpm=10, max_tpm=30000),
}
class PriorityRateLimiter:
"""分优先级的限流器"""
def __init__(self):
self.semaphores = {
p: asyncio.Semaphore(c.max_concurrent)
for p, c in PRIORITY_LIMITS.items()
}
self.request_counts = {p: deque() for p in Priority}
async def acquire(self, priority: Priority, estimated_tokens: int = 0):
"""获取执行许可"""
config = PRIORITY_LIMITS[priority]
# 并发限制
await self.semaphores[priority].acquire()
# RPM 限制
now = time.time()
self._cleanup_old(self.request_counts[priority], window=60)
if len(self.request_counts[priority]) >= config.max_rpm:
wait_time = 60 - (now - self.request_counts[priority][0])
await asyncio.sleep(wait_time)
self.request_counts[priority].append(now)
def release(self, priority: Priority):
self.semaphores[priority].release()

六、健康检查#

6.1 Agent 健康指标#

@dataclass
class AgentHealth:
success_rate: float # > 0.95
avg_latency_ms: float # < 2000
error_rate_by_type: dict
cache_hit_rate: float # > 0.5

6.2 自动恢复#

async def health_check_loop():
while True:
health = await check_agent_health()
if health.success_rate < 0.8:
await scale_up()
elif health.success_rate < 0.5:
await circuit_break()
await asyncio.sleep(30)

6.3 完整的健康检查系统#

from datetime import datetime, timedelta
@dataclass
class HealthCheckResult:
status: str # healthy / degraded / unhealthy
checks: dict[str, bool]
latency_ms: float
last_error: str | None
timestamp: datetime
class AgentHealthChecker:
"""Agent 健康检查系统"""
def __init__(self):
self.history: list[HealthCheckResult] = []
self.alert_handlers: list[callable] = []
async def check(self) -> HealthCheckResult:
"""执行完整健康检查"""
checks = {}
start_time = time.time()
# 检查 1: LLM API 可用性
checks["llm_api"] = await self._check_llm_api()
# 检查 2: 工具服务可用性
checks["tools"] = await self._check_tools()
# 检查 3: 数据库连接
checks["database"] = await self._check_database()
# 检查 4: 缓存服务
checks["cache"] = await self._check_cache()
# 检查 5: 最近错误率
checks["error_rate"] = self._check_error_rate()
latency = (time.time() - start_time) * 1000
all_healthy = all(checks.values())
mostly_healthy = sum(checks.values()) >= len(checks) * 0.6
status = "healthy" if all_healthy else ("degraded" if mostly_healthy else "unhealthy")
result = HealthCheckResult(
status=status,
checks=checks,
latency_ms=latency,
last_error=self._get_last_error(),
timestamp=datetime.now(),
)
self.history.append(result)
# 触发告警
if status != "healthy":
for handler in self.alert_handlers:
await handler(result)
return result
async def _check_llm_api(self) -> bool:
"""检查 LLM API 是否可用"""
try:
response = await asyncio.wait_for(
llm.complete("Hello, respond with 'OK'."),
timeout=10.0,
)
return "ok" in response.lower()
except Exception:
return False
async def _check_tools(self) -> bool:
"""检查核心工具是否可用"""
try:
result = await search_tool("test")
return result is not None
except Exception:
return False
async def _check_database(self) -> bool:
"""检查数据库连接"""
try:
await db.execute("SELECT 1")
return True
except Exception:
return False
async def _check_cache(self) -> bool:
"""检查缓存服务"""
try:
redis_client.ping()
return True
except Exception:
return False
def _check_error_rate(self) -> bool:
"""检查最近 10 分钟的错误率"""
cutoff = datetime.now() - timedelta(minutes=10)
recent = [r for r in self.history if r.timestamp > cutoff]
if not recent:
return True
error_rate = sum(1 for r in recent if r.status != "healthy") / len(recent)
return error_rate < 0.3

七、对抗性鲁棒性#

7.1 Red Teaming 概念#

Red Teaming 是系统化地寻找 Agent 漏洞的方法。对 Agent 来说,主要关注以下攻击面:

flowchart TD A["Agent 攻击面"] --> B["用户输入"] A --> C["工具返回"] A --> D["记忆系统"] A --> E["Agent 间通信"] B --> B1["提示注入"] B --> B2["越狱"] C --> C1["工具投毒"] D --> D1["记忆污染"] E --> E1["消息伪造"]

7.2 常见对抗性攻击及防御#

class AdversarialDefense:
"""对抗性防御"""
def __init__(self):
self.max_turns = 20
self.budget_per_user = 100 # 每用户每小时的 Token 预算
def check_input_safety(self, user_input: str) -> dict:
"""输入安全检查"""
risks = []
# 检查 1: 提示注入
if self._detect_prompt_injection(user_input):
risks.append({"type": "prompt_injection", "severity": "high"})
# 检查 2: 越狱尝试
if self._detect_jailbreak(user_input):
risks.append({"type": "jailbreak", "severity": "high"})
# 检查 3: 敏感信息请求
if self._detect_sensitive_request(user_input):
risks.append({"type": "sensitive_request", "severity": "medium"})
return {
"is_safe": not any(r["severity"] == "high" for r in risks),
"risks": risks,
}
def _detect_prompt_injection(self, text: str) -> bool:
patterns = [
r"忽略.*指令",
r"forget.*instructions",
r"new instructions",
r"system\s*:",
r"<\|im_start\|>",
]
return any(re.search(p, text, re.IGNORECASE) for p in patterns)
def _detect_jailbreak(self, text: str) -> bool:
patterns = [
r"DAN\s+mode",
r"developer\s+mode",
r"jailbreak",
r"越狱",
]
return any(re.search(p, text, re.IGNORECASE) for p in patterns)
def _detect_sensitive_request(self, text: str) -> bool:
patterns = [
r"系统.*(提示|prompt)",
r"(password|secret|api.?key)",
r"数据库.*(密码|连接串)",
]
return any(re.search(p, text, re.IGNORECASE) for p in patterns)

7.3 输出安全过滤#

class OutputSafetyFilter:
"""输出安全过滤器"""
SENSITIVE_PATTERNS = [
(r"sk-[a-zA-Z0-9]{32,}", "[API_KEY_REDACTED]"),
(r"\b\d{16,19}\b", "[CARD_NUMBER_REDACTED]"),
(r"\b\d{3}-\d{2}-\d{4}\b", "[SSN_REDACTED]"),
(r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}", "[EMAIL_REDACTED]"),
]
def filter(self, response: str) -> str:
"""过滤敏感信息"""
for pattern, replacement in self.SENSITIVE_PATTERNS:
response = re.sub(pattern, replacement, response)
return response

八、总结#

模式用途效果
重试 + 退避瞬时失败恢复 30%
熔断器级联失败防止崩溃
降级部分失败保持可用
限流过载保护稳定服务
输入验证恶意输入防止攻击
输出验证格式错误提高成功率

8.1 可靠性设计清单#

  • 重试策略:所有 LLM 调用和工具调用都应有指数退避重试
  • 熔断保护:对外部 API 设置熔断器,防止级联故障
  • 降级方案:为每个核心功能准备至少一级降级策略
  • 输入验证:过滤恶意输入,限制输入长度和格式
  • 输出验证:检查 LLM 输出格式、长度和内容安全性
  • 限流保护:按优先级控制并发和速率
  • 健康检查:定时检测各组件状态,自动告警和恢复
  • 对抗防御:防范提示注入、越狱和工具投毒

可靠性是生产 Agent 系统的基石!

参考资料#

支持与分享

如果这篇文章对你有帮助,欢迎支持作者或分享给更多人

Agent 可靠性:重试、熔断与降级
https://blog.souloss.com/posts/machine-learning/agent-guide/agent-reliability-design/
作者
Souloss
发布于
2025-04-21
许可协议
CC BY-NC-SA 4.0

部分信息可能已经过时