1025 字
3 分钟
Agent 可靠性:重试、熔断与降级
前言
Agent 依赖外部 API,存在失败风险。本章讲解构建可靠 Agent 系统的关键模式,包括错误处理、重试策略、输入输出验证、降级模式和对抗性鲁棒性。
一、失败类型
1.1 Agent 常见失败
| 失败类型 | 频率 | 影响 |
|---|---|---|
| API 超时 | 高 | 中 |
| 速率限制 | 中 | 高 |
| 工具执行失败 | 中 | 中 |
| 上下文超限 | 低 | 高 |
| 幻觉严重 | 低 | 极高 |
1.2 失败处理策略
from enum import Enum
class FailureType(Enum): RETRYABLE = "retry" DEGRADABLE = "degrade" FATAL = "fatal"
FAILURE_STRATEGIES = { FailureType.RETRYABLE: retry_with_backoff, FailureType.DEGRADABLE: degrade_to_simple, FailureType.FATAL: return_error_to_user}1.3 Agent 特有的失败模式
与传统微服务不同,Agent 有一些独特的失败模式:
| 失败模式 | 表现 | 影响 |
|---|---|---|
| 推理循环 | Agent 反复执行相同操作 | Token 消耗飙升 |
| 工具调用幻觉 | 调用不存在的工具或编造参数 | 任务失败 |
| 上下文窗口溢出 | 对话历史超出 Token 限制 | API 报错 |
| 格式解析失败 | LLM 输出不符合预期格式 | 后续步骤出错 |
| 多 Agent 死锁 | 两个 Agent 互相等待对方结果 | 请求超时 |
| 级联幻觉 | 前一步的错误被后续步骤放大 | 最终输出严重失真 |
二、重试机制
2.1 指数退避
import asynciofrom tenacity import retry, stop_after_attempt, wait_exponential
@retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10))async def call_with_retry(tool_func, *args, **kwargs): try: return await tool_func(*args, **kwargs) except RateLimitError: raise RetryableError("Rate limited")2.2 熔断器
from circuitbreaker import circuit
@circuit(maximum=10, failure_threshold=5, recovery_timeout=60)async def protected_call(): return await vulnerable_api_call()2.3 LLM 调用的智能重试
LLM 调用失败不同于普通 API 调用。有些失败可以重试,有些需要调整参数:
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
class LLMAPIError(Exception): def __init__(self, error_type: str, message: str): self.error_type = error_type super().__init__(message)
class ContextLengthExceeded(LLMAPIError): def __init__(self, requested: int, limit: int): super().__init__("context_length_exceeded", f"需要 {requested} tokens,上限 {limit}") self.requested_tokens = requested self.max_tokens = limit
class RateLimitExceeded(LLMAPIError): def __init__(self, retry_after: float): super().__init__("rate_limit", "请求过快") self.retry_after = retry_after
class SmartLLMRetry: """LLM 智能重试:根据错误类型采取不同策略"""
def __init__(self, max_retries: int = 3): self.max_retries = max_retries
async def call(self, prompt: str, model: str = "gpt-4o", **kwargs) -> str: """带智能重试的 LLM 调用""" current_prompt = prompt current_model = model
for attempt in range(self.max_retries): try: return await self._do_call(current_prompt, current_model, **kwargs)
except ContextLengthExceeded as e: # 策略 1: 截断 Prompt if attempt == 0: current_prompt = self._truncate_prompt(current_prompt, int(e.max_tokens * 0.8)) continue
# 策略 2: 换用更大上下文的模型 if attempt == 1: current_model = self._get_larger_context_model(current_model) continue
# 策略 3: 压缩历史 current_prompt = await self._compress_prompt(current_prompt) continue
except RateLimitExceeded as e: # 等待后重试 await asyncio.sleep(e.retry_after) continue
except LLMAPIError as e: if e.error_type in ("invalid_request", "authentication"): # 不可重试的错误,立即抛出 raise # 其他错误正常重试 await asyncio.sleep(2 ** attempt) continue
raise Exception(f"LLM 调用在 {self.max_retries} 次重试后仍然失败")
async def _do_call(self, prompt: str, model: str, **kwargs) -> str: """执行 LLM 调用""" try: return await llm.complete(prompt, model=model, **kwargs) except Exception as e: error = self._parse_error(e) raise error from e
def _truncate_prompt(self, prompt: str, target_tokens: int) -> str: """截断 Prompt 到目标 Token 数""" current_tokens = count_tokens(prompt) if current_tokens <= target_tokens: return prompt ratio = target_tokens / current_tokens cut_point = int(len(prompt) * ratio) return prompt[:cut_point] + "\n\n[内容已截断]"
def _get_larger_context_model(self, current: str) -> str: """获取上下文更大的替代模型""" upgrades = { "gpt-4o-mini": "gpt-4o", "claude-haiku-3.5": "claude-sonnet-4", "gemini-2.0-flash": "gemini-2.5-pro", } return upgrades.get(current, current)2.4 重试策略对比
| 策略 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
| 固定间隔重试 | 简单瞬时错误 | 实现简单 | 可能加剧限流 |
| 指数退避 | 速率限制、服务端错误 | 自适应 | 总等待时间较长 |
| 抖动退避 | 高并发场景 | 避免重试风暴 | 实现稍复杂 |
| 智能重试 | LLM 特有错误 | 针对性强 | 需要错误分类 |
# 抖动退避实现import random
async def retry_with_jitter( func, max_retries: int = 3, base_delay: float = 1.0, max_delay: float = 30.0,): """带抖动的指数退避""" for attempt in range(max_retries): try: return await func() except Exception as e: if attempt == max_retries - 1: raise
# 指数退避 + 随机抖动 delay = min(base_delay * (2 ** attempt), max_delay) jitter = random.uniform(0, delay * 0.5) await asyncio.sleep(delay + jitter)三、降级策略
3.1 多级降级
async def degrade_to_simple(query: str) -> str: """降级到简单模式""" # 第一级:简单 RAG try: return await simple_rag(query) except: pass
# 第二级:关键词匹配 try: return await keyword_search(query) except: pass
# 第三级:返回预设答案 return "抱歉,暂时无法回答您的问题。"3.2 功能降级
graph TD
A["用户请求"] --> B{"功能正常?"}
B -->|"是"| C["完整 Agent"]
B -->|"否"| D{"工具可用?"}
D -->|"是"| E["简化版 Agent"]
D -->|"否"| F["FAQ 机器人"]
3.3 结构化降级框架
生产环境的降级需要根据依赖状态动态调整,而非简单的 try-except 嵌套:
from dataclasses import dataclassfrom enum import Enum
class ServiceLevel(Enum): FULL = "full" # 完整功能 DEGRADED = "degraded" # 部分功能 MINIMAL = "minimal" # 最小功能 OFFLINE = "offline" # 仅返回静态内容
@dataclassclass ServiceHealth: llm_available: bool = True search_available: bool = True database_available: bool = True cache_available: bool = True
class GracefulDegradation: """优雅降级框架"""
def __init__(self): self.health = ServiceHealth() self.degradation_rules = { # (缺失的服务, 降级策略) "no_search": { "level": ServiceLevel.DEGRADED, "fallback": "knowledge_base_only", "message": "搜索功能暂不可用,基于知识库回答", }, "no_llm": { "level": ServiceLevel.MINIMAL, "fallback": "template_responses", "message": "AI 服务暂不可用,返回预设回答", }, "no_database": { "level": ServiceLevel.DEGRADED, "fallback": "llm_knowledge_only", "message": "数据库暂不可用,基于模型知识回答", }, }
def determine_level(self) -> tuple[ServiceLevel, list[str]]: """根据健康状况确定服务级别""" missing = [] if not self.health.llm_available: missing.append("no_llm") if not self.health.search_available: missing.append("no_search") if not self.health.database_available: missing.append("no_database")
if not missing: return ServiceLevel.FULL, []
# 取最严重的降级级别 levels = [self.degradation_rules[m]["level"] for m in missing] severity = {ServiceLevel.FULL: 0, ServiceLevel.DEGRADED: 1, ServiceLevel.MINIMAL: 2} worst = max(levels, key=lambda l: severity[l]) return worst, missing
async def handle_request(self, query: str) -> dict: """根据当前服务级别处理请求""" level, missing = self.determine_level()
if level == ServiceLevel.FULL: result = await self._full_service(query) elif level == ServiceLevel.DEGRADED: result = await self._degraded_service(query, missing) elif level == ServiceLevel.MINIMAL: result = await self._minimal_service(query) else: result = self._offline_response()
result["service_level"] = level.value return result
async def _full_service(self, query: str) -> dict: """完整服务""" response = await agent.run(query) return {"response": response, "quality": "high"}
async def _degraded_service(self, query: str, missing: list[str]) -> dict: """降级服务""" messages = [self.degradation_rules[m]["message"] for m in missing]
if "no_search" in missing: # 搜索不可用,用知识库 response = await knowledge_base_rag(query) elif "no_database" in missing: # 数据库不可用,靠 LLM 知识 response = await llm.complete(query) else: response = await agent.run(query)
return {"response": response, "quality": "medium", "warnings": messages}
async def _minimal_service(self, query: str) -> dict: """最小服务""" # 模板匹配或 FAQ response = match_faq(query) or "服务暂时受限,请稍后再试。" return {"response": response, "quality": "low"}
def _offline_response(self) -> dict: return {"response": "系统维护中,请稍后再试。", "quality": "none"}四、输入验证与输出解析
4.1 输入验证
Agent 的输入来自用户,可能包含格式错误、恶意内容或超出处理能力的内容:
import refrom dataclasses import dataclass
@dataclassclass ValidationResult: is_valid: bool errors: list[str] warnings: list[str] sanitized_input: str | None = None
class InputValidator: """Agent 输入验证器"""
MAX_INPUT_LENGTH = 10000 MAX_TOOL_PARAMS = 20
def validate(self, user_input: str) -> ValidationResult: errors = [] warnings = []
# 1. 长度检查 if len(user_input) > self.MAX_INPUT_LENGTH: errors.append(f"输入过长: {len(user_input)} > {self.MAX_INPUT_LENGTH}") elif len(user_input) > self.MAX_INPUT_LENGTH * 0.8: warnings.append("输入接近长度上限,可能影响处理效果")
# 2. 空输入检查 if not user_input.strip(): errors.append("输入为空")
# 3. 注入检测 injection_score = self._detect_injection(user_input) if injection_score > 0.8: errors.append("输入包含疑似注入内容") elif injection_score > 0.5: warnings.append("输入包含可疑内容")
# 4. 编码检查 if not self._is_valid_encoding(user_input): errors.append("输入包含无效字符")
# 5. 语言检查(可选) if self._contains_mixed_scripts(user_input): warnings.append("输入包含混合文字,可能影响理解")
sanitized = self._sanitize(user_input) if not errors else None
return ValidationResult( is_valid=len(errors) == 0, errors=errors, warnings=warnings, sanitized_input=sanitized, )
def _detect_injection(self, text: str) -> float: """检测注入攻击(返回 0-1 的风险分数)""" injection_patterns = [ r"忽略.{0,5}(之前的|上面|所有|全部).{0,5}(指令|规则|提示)", r"(forget|ignore|disregard).{0,10}(previous|above|all).{0,10}(instructions|rules)", r"你是一个", r"system:", r"<\|im_start\|>", r"\\n\\n", ] matches = sum(1 for p in injection_patterns if re.search(p, text, re.IGNORECASE)) return min(matches / len(injection_patterns), 1.0)
def _is_valid_encoding(self, text: str) -> bool: try: text.encode("utf-8") return True except UnicodeEncodeError: return False
def _contains_mixed_scripts(self, text: str) -> bool: has_cjk = any("\u4e00" <= c <= "\u9fff" for c in text) has_cyrillic = any("\u0400" <= c <= "\u04ff" for c in text) has_arabic = any("\u0600" <= c <= "\u06ff" for c in text) scripts = [has_cjk, has_cyrillic, has_arabic] return sum(scripts) > 1
def _sanitize(self, text: str) -> str: """清理输入""" # 移除控制字符 sanitized = re.sub(r"[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]", "", text) # 规范化空白 sanitized = re.sub(r"\s+", " ", sanitized).strip() return sanitized4.2 输出解析可靠性
LLM 的输出格式不稳定是 Agent 系统的常见痛点。以下是健壮的输出解析策略:
import jsonimport re
class RobustOutputParser: """健壮的 LLM 输出解析器"""
async def parse_json(self, text: str) -> dict | None: """从 LLM 输出中可靠地解析 JSON""" # 策略 1: 直接解析 try: return json.loads(text) except json.JSONDecodeError: pass
# 策略 2: 提取 ```json ... ``` 代码块 json_match = re.search(r"```(?:json)?\s*\n?(.*?)\n?```", text, re.DOTALL) if json_match: try: return json.loads(json_match.group(1)) except json.JSONDecodeError: pass
# 策略 3: 找到第一个 { 和最后一个 } first_brace = text.find("{") last_brace = text.rfind("}") if first_brace != -1 and last_brace != -1: try: return json.loads(text[first_brace:last_brace + 1]) except json.JSONDecodeError: pass
# 策略 4: 让 LLM 自修复 return await self._llm_repair_json(text)
async def _llm_repair_json(self, broken_text: str) -> dict | None: """用 LLM 修复损坏的 JSON""" repair_prompt = f"""以下文本应该是一个 JSON 对象,但格式有问题。请修复并输出有效的 JSON。
原始文本:{broken_text}
修复后的 JSON:"""
for attempt in range(2): try: repaired = await llm.complete(repair_prompt) # 提取并解析 return json.loads(repaired) except: continue
return None
def parse_action(self, text: str) -> dict: """从 ReAct 格式的文本中提取 Action""" # 匹配多种格式 patterns = [ r"Action:\s*(\w+)\s*\((.*)\)", # Action: search(query="test") r"Action:\s*(\w+)\s*\[(.*)\]", # Action: search[query="test"] r"```(?:json)?\s*\n?{{.*?\"action\":\s*\"(\w+)\".*?\"input\":\s*({.*?})", ]
for pattern in patterns: match = re.search(pattern, text, re.DOTALL) if match: return { "tool": match.group(1), "input": self._parse_action_input(match.group(2)), }
# 无法解析,返回空 Action return {"tool": None, "input": {}}
def parse_final_answer(self, text: str) -> str: """提取 Final Answer""" patterns = [ r"Final Answer:\s*(.*)", r"最终答案[::]\s*(.*)", r"答案[::]\s*(.*)", ]
for pattern in patterns: match = re.search(pattern, text, re.DOTALL) if match: return match.group(1).strip()
# 没有明确的 Final Answer 标记,返回全文 return text.strip()4.3 输出验证
解析之后还需要验证输出是否合理:
class OutputValidator: """输出验证器"""
def validate(self, response: str, schema: dict | None = None) -> dict: issues = []
# 1. 空响应检查 if not response or not response.strip(): issues.append({"type": "empty", "severity": "critical"})
# 2. 长度检查 if len(response) > 50000: issues.append({"type": "too_long", "severity": "warning"}) elif len(response) < 10: issues.append({"type": "too_short", "severity": "warning"})
# 3. 重复检查(LLM 有时会重复内容) if self._has_excessive_repetition(response): issues.append({"type": "repetition", "severity": "warning"})
# 4. 有害内容检查 if self._contains_harmful_content(response): issues.append({"type": "harmful", "severity": "critical"})
# 5. Schema 验证(如果指定) if schema: schema_issues = self._validate_schema(response, schema) issues.extend(schema_issues)
return { "is_valid": not any(i["severity"] == "critical" for i in issues), "issues": issues, }
def _has_excessive_repetition(self, text: str) -> bool: """检测过度的内容重复""" sentences = text.split("。") if len(sentences) < 3: return False # 检查是否有超过 3 个相同的句子 from collections import Counter counter = Counter(s.strip() for s in sentences if s.strip()) return any(count > 3 for count in counter.values())
def _contains_harmful_content(self, text: str) -> bool: """简单的有害内容检测""" harmful_patterns = [ r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b", # 电话号码 r"\b\d{3}[-.]?\d{2}[-.]?\d{4}\b", # SSN r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}", # 邮箱 ] return any(re.search(p, text) for p in harmful_patterns)五、限流保护
5.1 Token 速率限制
import timefrom collections import deque
class TokenRateLimiter: def __init__(self, max_tokens: int, window_seconds: int): self.max_tokens = max_tokens self.window = window_seconds self.requests = deque()
async def acquire(self, tokens: int): now = time.time() # 清理过期请求 while self.requests and now - self.requests[0] > self.window: self.requests.popleft()
if sum(self.requests) + tokens > self.max_tokens: wait_time = self.window - (now - self.requests[0]) await asyncio.sleep(wait_time)
self.requests.append(tokens)5.2 并发限制
from asyncio import Semaphore
MAX_CONCURRENT = 10semaphore = Semaphore(MAX_CONCURRENT)
async def limited_agent_call(query: str): async with semaphore: return await agent.process(query)5.3 分优先级的限流
不同类型的请求需要不同的限流策略:
from dataclasses import dataclassfrom enum import Enum
class Priority(Enum): CRITICAL = 0 # 付费用户、关键业务 NORMAL = 1 # 普通用户 LOW = 2 # 后台任务、批量处理
@dataclassclass RateLimitConfig: max_concurrent: int max_rpm: int # 每分钟请求数 max_tpm: int # 每分钟 Token 数
PRIORITY_LIMITS = { Priority.CRITICAL: RateLimitConfig(max_concurrent=20, max_rpm=120, max_tpm=200000), Priority.NORMAL: RateLimitConfig(max_concurrent=10, max_rpm=60, max_tpm=100000), Priority.LOW: RateLimitConfig(max_concurrent=3, max_rpm=10, max_tpm=30000),}
class PriorityRateLimiter: """分优先级的限流器"""
def __init__(self): self.semaphores = { p: asyncio.Semaphore(c.max_concurrent) for p, c in PRIORITY_LIMITS.items() } self.request_counts = {p: deque() for p in Priority}
async def acquire(self, priority: Priority, estimated_tokens: int = 0): """获取执行许可""" config = PRIORITY_LIMITS[priority]
# 并发限制 await self.semaphores[priority].acquire()
# RPM 限制 now = time.time() self._cleanup_old(self.request_counts[priority], window=60) if len(self.request_counts[priority]) >= config.max_rpm: wait_time = 60 - (now - self.request_counts[priority][0]) await asyncio.sleep(wait_time)
self.request_counts[priority].append(now)
def release(self, priority: Priority): self.semaphores[priority].release()六、健康检查
6.1 Agent 健康指标
@dataclassclass AgentHealth: success_rate: float # > 0.95 avg_latency_ms: float # < 2000 error_rate_by_type: dict cache_hit_rate: float # > 0.56.2 自动恢复
async def health_check_loop(): while True: health = await check_agent_health()
if health.success_rate < 0.8: await scale_up() elif health.success_rate < 0.5: await circuit_break()
await asyncio.sleep(30)6.3 完整的健康检查系统
from datetime import datetime, timedelta
@dataclassclass HealthCheckResult: status: str # healthy / degraded / unhealthy checks: dict[str, bool] latency_ms: float last_error: str | None timestamp: datetime
class AgentHealthChecker: """Agent 健康检查系统"""
def __init__(self): self.history: list[HealthCheckResult] = [] self.alert_handlers: list[callable] = []
async def check(self) -> HealthCheckResult: """执行完整健康检查""" checks = {} start_time = time.time()
# 检查 1: LLM API 可用性 checks["llm_api"] = await self._check_llm_api()
# 检查 2: 工具服务可用性 checks["tools"] = await self._check_tools()
# 检查 3: 数据库连接 checks["database"] = await self._check_database()
# 检查 4: 缓存服务 checks["cache"] = await self._check_cache()
# 检查 5: 最近错误率 checks["error_rate"] = self._check_error_rate()
latency = (time.time() - start_time) * 1000 all_healthy = all(checks.values()) mostly_healthy = sum(checks.values()) >= len(checks) * 0.6
status = "healthy" if all_healthy else ("degraded" if mostly_healthy else "unhealthy")
result = HealthCheckResult( status=status, checks=checks, latency_ms=latency, last_error=self._get_last_error(), timestamp=datetime.now(), )
self.history.append(result)
# 触发告警 if status != "healthy": for handler in self.alert_handlers: await handler(result)
return result
async def _check_llm_api(self) -> bool: """检查 LLM API 是否可用""" try: response = await asyncio.wait_for( llm.complete("Hello, respond with 'OK'."), timeout=10.0, ) return "ok" in response.lower() except Exception: return False
async def _check_tools(self) -> bool: """检查核心工具是否可用""" try: result = await search_tool("test") return result is not None except Exception: return False
async def _check_database(self) -> bool: """检查数据库连接""" try: await db.execute("SELECT 1") return True except Exception: return False
async def _check_cache(self) -> bool: """检查缓存服务""" try: redis_client.ping() return True except Exception: return False
def _check_error_rate(self) -> bool: """检查最近 10 分钟的错误率""" cutoff = datetime.now() - timedelta(minutes=10) recent = [r for r in self.history if r.timestamp > cutoff] if not recent: return True error_rate = sum(1 for r in recent if r.status != "healthy") / len(recent) return error_rate < 0.3七、对抗性鲁棒性
7.1 Red Teaming 概念
Red Teaming 是系统化地寻找 Agent 漏洞的方法。对 Agent 来说,主要关注以下攻击面:
flowchart TD
A["Agent 攻击面"] --> B["用户输入"]
A --> C["工具返回"]
A --> D["记忆系统"]
A --> E["Agent 间通信"]
B --> B1["提示注入"]
B --> B2["越狱"]
C --> C1["工具投毒"]
D --> D1["记忆污染"]
E --> E1["消息伪造"]
7.2 常见对抗性攻击及防御
class AdversarialDefense: """对抗性防御"""
def __init__(self): self.max_turns = 20 self.budget_per_user = 100 # 每用户每小时的 Token 预算
def check_input_safety(self, user_input: str) -> dict: """输入安全检查""" risks = []
# 检查 1: 提示注入 if self._detect_prompt_injection(user_input): risks.append({"type": "prompt_injection", "severity": "high"})
# 检查 2: 越狱尝试 if self._detect_jailbreak(user_input): risks.append({"type": "jailbreak", "severity": "high"})
# 检查 3: 敏感信息请求 if self._detect_sensitive_request(user_input): risks.append({"type": "sensitive_request", "severity": "medium"})
return { "is_safe": not any(r["severity"] == "high" for r in risks), "risks": risks, }
def _detect_prompt_injection(self, text: str) -> bool: patterns = [ r"忽略.*指令", r"forget.*instructions", r"new instructions", r"system\s*:", r"<\|im_start\|>", ] return any(re.search(p, text, re.IGNORECASE) for p in patterns)
def _detect_jailbreak(self, text: str) -> bool: patterns = [ r"DAN\s+mode", r"developer\s+mode", r"jailbreak", r"越狱", ] return any(re.search(p, text, re.IGNORECASE) for p in patterns)
def _detect_sensitive_request(self, text: str) -> bool: patterns = [ r"系统.*(提示|prompt)", r"(password|secret|api.?key)", r"数据库.*(密码|连接串)", ] return any(re.search(p, text, re.IGNORECASE) for p in patterns)7.3 输出安全过滤
class OutputSafetyFilter: """输出安全过滤器"""
SENSITIVE_PATTERNS = [ (r"sk-[a-zA-Z0-9]{32,}", "[API_KEY_REDACTED]"), (r"\b\d{16,19}\b", "[CARD_NUMBER_REDACTED]"), (r"\b\d{3}-\d{2}-\d{4}\b", "[SSN_REDACTED]"), (r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}", "[EMAIL_REDACTED]"), ]
def filter(self, response: str) -> str: """过滤敏感信息""" for pattern, replacement in self.SENSITIVE_PATTERNS: response = re.sub(pattern, replacement, response) return response八、总结
| 模式 | 用途 | 效果 |
|---|---|---|
| 重试 + 退避 | 瞬时失败 | 恢复 30% |
| 熔断器 | 级联失败 | 防止崩溃 |
| 降级 | 部分失败 | 保持可用 |
| 限流 | 过载保护 | 稳定服务 |
| 输入验证 | 恶意输入 | 防止攻击 |
| 输出验证 | 格式错误 | 提高成功率 |
8.1 可靠性设计清单
- 重试策略:所有 LLM 调用和工具调用都应有指数退避重试
- 熔断保护:对外部 API 设置熔断器,防止级联故障
- 降级方案:为每个核心功能准备至少一级降级策略
- 输入验证:过滤恶意输入,限制输入长度和格式
- 输出验证:检查 LLM 输出格式、长度和内容安全性
- 限流保护:按优先级控制并发和速率
- 健康检查:定时检测各组件状态,自动告警和恢复
- 对抗防御:防范提示注入、越狱和工具投毒
可靠性是生产 Agent 系统的基石!
参考资料
支持与分享
如果这篇文章对你有帮助,欢迎支持作者或分享给更多人
部分信息可能已经过时
相关文章 智能推荐
1
Agent 可观测性:日志、追踪与调试
AI 深度解读 Agent 可观测性——Langfuse、OpenTelemetry 追踪、LangSmith 等工具
2
Agent 成本优化:Token 节省与缓存策略
AI 深度解读 Agent 成本优化——Token 计数、缓存策略、Prompt 压缩、路由优化
3
Agent 架构模式:Handoffs、Fan-out 与 Supervisor
AI 深度解读 Agent 架构模式——Handoffs 交接模式、Fan-out/Fan-in 并行模式、Supervisor 监督模式
4
从Chatbot到Agent:打造能自主干活的AI
AI 从Chatbot到Agent——打造能自主干活的AI
5
Agent 测试策略:从单元到集成
AI 深度解读 Agent 测试——单元测试、集成测试、LLM-as-judge、模糊测试






