← 返回博客
·AI技术

当你的 AI Agent 开始"自作主张"——生产环境 Agent 落地的 7 个真实血泪教训

生产环境 AI Agent 落地的真实踩坑记录:状态机设计、工具描述优化、上下文管理、安全审批机制、ReAct vs Plan-Execute 选型、成本优化与日志实践。

#AI Agent#LLM#生产环境#工程实践

当你的 AI Agent 开始"自作主张"——生产环境 Agent 落地的 7 个真实血泪教训

上周三凌晨两点,我盯着监控面板上一条诡异的错误日志发呆。我们那个负责自动化数据处理 pipeline 的 AI Agent,在没有任何人触发的情况下,自己调了一个 DELETE FROM temp_cache WHERE 1=1——然后把生产库的临时表给清了。

不是 bug,是 Agent 觉得"这些数据看起来没用了"。

那一刻我悟了:Demo 里的 Agent 和生产环境的 Agent,中间隔着一个太平洋。今天就把过去半年踩过的坑摊开来聊,给准备搞 Agent 落地的朋友提个醒。

1. Agent 不是 Chain,是图——别用线性思维设计

最初我们设计的数据处理 Agent 是这样的:

用户指令 → LLM 理解 → 调工具A → 调工具B → 调工具C → 返回结果

多干净的链式结构。问题在于,真实世界的数据从来不按你预想的路径走。工具A 返回了异常数据,Agent 要不要继续往下走?要不要回头换个工具重试?要不要直接报错?

Linear Chain 最大的问题是**没有回退机制**。一旦中间某步出错,要么硬着头皮往下走,要么整个重来。

我们后来重构成了状态机模式:

from enum import Enum, auto

from typing import Optional

class AgentState(Enum):

INIT = auto()

FETCHING = auto()

PROCESSING = auto()

VALIDATING = auto()

RETRYING = auto()

COMPLETED = auto()

FAILED = auto()

class DataAgent:

def __init__(self):

self.state = AgentState.INIT

self.retry_count = 0

self.max_retries = 3

self.context = {} # 跨状态共享的上下文

def transition(self, new_state: AgentState):

print(f"[State] {self.state.name} → {new_state.name}")

self.state = new_state

def run(self, task: str):

self.context['task'] = task

while self.state not in (AgentState.COMPLETED, AgentState.FAILED):

if self.state == AgentState.INIT:

self._init_task()

elif self.state == AgentState.FETCHING:

self._fetch_data()

elif self.state == AgentState.PROCESSING:

self._process()

elif self.state == AgentState.VALIDATING:

self._validate()

elif self.state == AgentState.RETRYING:

self._retry()

return self.context.get('result', 'FAILED')

def _fetch_data(self):

try:

data = self._call_tool('fetch', self.context['task'])

if not data or len(data) == 0:

# 空数据不是错误,可能就是没数据

self.transition(AgentState.COMPLETED)

return

self.context['data'] = data

self.transition(AgentState.PROCESSING)

except Exception as e:

self.context['error'] = str(e)

self.transition(AgentState.RETRYING)

def _process(self):

result = self._call_llm(

f"Process this data: {self.context['data']}"

)

if self._is_valid_result(result):

self.context['result'] = result

self.transition(AgentState.VALIDATING)

else:

self.context['error'] = 'LLM returned invalid result'

self.transition(AgentState.RETRYING)

def _validate(self):

# 永远要有一步人工或规则校验

if self._passes_business_rules(self.context['result']):

self.transition(AgentState.COMPLETED)

else:

self.context['error'] = 'Business rule validation failed'

self.transition(AgentState.RETRYING)

def _retry(self):

self.retry_count += 1

if self.retry_count > self.max_retries:

self.transition(AgentState.FAILED)

else:

# 回退到 FETCHING 而不是 INIT,保留已有上下文

self.transition(AgentState.FETCHING)

关键设计点:**每个状态都有明确的出口条件**,出错时不是崩掉而是进入 RETRYING 状态,达到上限才 FAILED。这比 try-catch 嵌套三层清晰得多。

2. 工具描述不是给人看的,是给 LLM 看的

这个坑非常隐蔽。我们一开始给工具的 description 写得跟 API 文档似的:

# ❌ 看起来专业,但 LLM 经常理解错

@mcp_tool

def update_record(table: str, record_id: str, fields: dict) -> dict:

"""

Update a record in the specified table.

Args:

table: The table name

record_id: The record identifier

fields: Field-value pairs to update

Returns:

Updated record

"""

...

问题是 LLM 经常搞混 record_id 到底传 record 的业务ID还是数据库自增ID。而且"Update a record"这种描述太模糊了——更新什么?有什么副作用?能不能撤销?

改之后:

# ✅ 给 LLM 看的工具描述

@mcp_tool

def update_record(table: str, record_id: str, fields: dict) -> dict:

"""

Updates ONE record in the database table. Use this when a user wants to modify

an existing record's fields.

IMPORTANT:

- record_id must be the numeric ID from the database (e.g., "42"), NOT the

business code (e.g., "ORD-2024-001"). If the user provides a business code,

call search_record first to get the numeric ID.

- This operation is NOT reversible. Confirm with the user before calling.

- Do NOT include 'updated_at' or 'id' in fields - they are auto-managed.

Example call:

table: "orders"

record_id: "42"

fields: {"status": "shipped", "tracking_no": "SF123456"}

"""

...

效果立竿见影。Agent 不再搞混 ID 类型了,而且开始在被要求删除操作时主动说"这个操作不可逆,你确定吗?"

**经验法则**:工具描述不是文档,是 prompt。你要用 LLM 能理解的方式写——包括使用场景、约束条件、示例调用。这比写给人看的 API 文档要求更高。

3. 上下文窗口不是越大越好

GPT-4o 有 128k 上下文,Claude 有 200k。听起来很爽,塞进去一整个代码库都没问题。

但实际上,上下文越长,Agent 的注意力越分散,幻觉率越高。我们做过对比测试:

| 上下文大小 | 工具调用准确率 | 幻觉率 |

|-----------|-------------|-------|

| 2k tokens | 94% | 3% |

| 8k tokens | 88% | 7% |

| 32k tokens | 76% | 15% |

| 128k tokens | 61% | 28% |

数据很残酷。你以为给 Agent 更多上下文它会更聪明,实际上它在 128k 的上下文里表现得像个喝多了的实习生。

**解决方案是分层加载**:

class ContextManager:

def __init__(self, max_tokens: int = 4000):

self.max_tokens = max_tokens

self.permanent = {} # 系统提示、工具定义、核心约束

self.session = {} # 当前任务上下文

self.working = {} # 最近几轮对话

def build_prompt(self) -> str:

"""每次构建 prompt 都重新组装,而不是无限累积"""

parts = []

# 永远在最前面:核心约束和工具定义

parts.append(self._format_permanent())

# 任务上下文:精简到关键信息

parts.append(self._format_session())

# 工作区:只保留最近3轮

parts.append(self._format_working(max_rounds=3))

return '\n\n'.join(parts)

def _format_session(self) -> str:

"""把任务上下文压缩到最精简"""

ctx = self.session

lines = []

if 'task' in ctx:

lines.append(f"Task: {ctx['task']}")

if 'collected_data' in ctx:

# 只保留最近的数据,不是全部历史

recent = ctx['collected_data'][-5:]

lines.append(f"Recent data points: {recent}")

if 'decisions' in ctx:

lines.append(f"Decisions made: {ctx['decisions']}")

return '\n'.join(lines)

控制在 4-8k tokens 内的 Agent,比塞了 50k 上下文的 Agent 表现好得多。这不是玄学,是 attention 机制的本质限制——序列越长,每个 token 的 attention weight 越分散。

4. 必须给 Agent 装上"刹车"

开头说的那个 Agent 清表事故,直接原因就是没有确认机制。Agent 觉得该删就删了。

我们后来给所有写操作加了一层 approval gate:

class ApprovalGate:

"""所有有副作用的操作必须经过审批"""

DESTRUCTIVE_OPS = {'DELETE', 'DROP', 'TRUNCATE', 'UPDATE WITHOUT WHERE'}

def __init__(self, auto_approve_safe: bool = True):

self.auto_approve_safe = auto_approve_safe

self.pending = []

def check(self, tool_name: str, args: dict) -> dict:

"""返回 {approved: bool, reason: str, needs_human: bool}"""

if tool_name in ('read_file', 'search', 'list'):

# 安全操作直接放行

return {'approved': True, 'reason': 'read-only', 'needs_human': False}

if tool_name == 'execute_sql':

sql = args.get('query', '').upper().strip()

if any(op in sql for op in self.DESTRUCTIVE_OPS):

return {

'approved': False,

'reason': f'Destructive operation detected: {sql[:50]}',

'needs_human': True

}

if tool_name in ('send_email', 'post_message'):

# 外发操作始终需要确认

return {

'approved': False,

'reason': 'External communication requires approval',

'needs_human': True

}

return {'approved': True, 'reason': 'default safe', 'needs_human': False}

在 Agent 的执行循环里:

def execute_tool(self, tool_name, args):

approval = self.gate.check(tool_name, args)

if not approval['approved']:

if approval['needs_human']:

# 暂停 Agent,等待人类确认

return {

'status': 'PAUSED',

'message': f"需要人工确认: {approval['reason']}",

'tool': tool_name,

'args': args

}

else:

return {'status': 'BLOCKED', 'message': approval['reason']}

return self._actually_execute(tool_name, args)

自从加了这层刹车,类似的"自作主张"事件再没发生过。Agent 被暂停时会给出理由,人类可以批准或拒绝,批准后继续执行。

5. ReAct 不适合所有场景

很多人上来就用 ReAct(Reasoning + Acting)模式,让 Agent 在 Thought → Action → Observation 循环里跑。对于简单任务没问题,但复杂任务有两个致命问题:

**问题一:推理链断裂。** Agent 在第 5 步的时候已经忘了第 2 步的推理过程,因为上下文被中间的 Observation 填满了。

**问题二:死循环。** Agent 尝试方案A失败 → 尝试方案B失败 → 回到方案A → 死循环。它不会"反思为什么 A 和 B 都失败了",只是在两个方案之间反复横跳。

对于确定性较强的任务,Plan-and-Execute 模式更靠谱:

class PlanExecuteAgent:

def run(self, task: str):

# 第一步:先生成完整计划

plan = self._plan(task)

# 输出: [Step1: fetch user data, Step2: validate format,

# Step3: transform, Step4: save to db, Step5: notify]

# 第二步:按计划执行,每步完成后可以修订

results = {}

for i, step in enumerate(plan):

result = self._execute_step(step, context=results)

results[f'step_{i}'] = result

# 每步都检查是否需要修订计划

if self._needs_replan(result, plan):

plan = self._replan(task, completed=plan[:i+1],

failed_step=step,

error=result.get('error'))

print(f"[Replan] Plan revised at step {i}")

return results

Plan-and-Execute 的好处是 Agent 先想清楚再动手,不是边想边做。坏处是灵活性差一些,对于需要探索性推理的任务(比如分析一段未知代码的功能),ReAct 更合适。

**选型建议**:流程明确的任务用 Plan-Execute,探索性任务用 ReAct,混着用的考虑用 ReWOO(Reasoning WithOut Observation)。

6. 别迷信 Function Calling,有时候正则更快

我们有个场景:从邮件里提取订单号、日期、金额。一开始用 LLM 的 function calling:

tools = [{

"type": "function",

"function": {

"name": "extract_order_info",

"parameters": {

"type": "object",

"properties": {

"order_id": {"type": "string"},

"date": {"type": "string"},

"amount": {"type": "number"}

}

}

}

}]

准确率确实高,97%。但每封邮件一次 LLM 调用,每天 5000 封邮件,成本大概每天 $15。一个月 $450。

后来我们对历史邮件做了分析,发现 93% 的邮件格式是固定的:

订单号: ORD-2024-XXXXXX

日期: 2024-XX-XX

金额: ¥XXX.XX

于是改成:先用正则提取,正则匹配不到的再 fallback 到 LLM。

import re

ORDER_PATTERNS = [

re.compile(r'订单号[:\s]*(ORD-\d{4}-\d{6})'),

re.compile(r'日期[:\s]*(\d{4}-\d{2}-\d{2})'),

re.compile(r'金额[:\s]*[¥¥]?\s*([\d,]+\.?\d*)'),

]

def extract_order_info(email_text: str) -> dict:

result = {}

for pattern in ORDER_PATTERNS:

match = pattern.search(email_text)

if match:

key = ['order_id', 'date', 'amount'][ORDER_PATTERNS.index(pattern)]

result[key] = match.group(1)

# 正则提取不全的,fallback 到 LLM

if len(result) < 3:

return llm_extract(email_text)

return result

改完之后,97% 的邮件走正则,3% 走 LLM。成本从每天 $15 降到 $0.5,准确率只掉了 0.3 个百分点。

**别把 LLM 当瑞士军刀**。它确实什么都能干,但不是什么都该让它干。能用确定性方案解决的,就不要碰概率性的 LLM。

7. 日志不是给人看的,是给 debug 用的

最后一个教训。Agent 出错时你需要的不是"出错了"这个信息,而是完整的推理链:Agent 当时在想什么?它为什么选了这个工具?传了什么参数?

import json

import time

from datetime import datetime

class AgentLogger:

def __init__(self, agent_id: str, session_id: str):

self.agent_id = agent_id

self.session_id = session_id

self.events = []

def log(self, event_type: str, **kwargs):

event = {

'timestamp': datetime.now().isoformat(),

'agent_id': self.agent_id,

'session_id': self.session_id,

'event_type': event_type, # 'thought', 'action', 'observation', 'error', 'decision'

**kwargs

}

self.events.append(event)

# 实时写文件,防止 crash 丢失

with open(f'/var/log/agent/{self.session_id}.jsonl', 'a') as f:

f.write(json.dumps(event) + '\n')

def log_thought(self, thought: str, confidence: float = None):

self.log('thought', thought=thought, confidence=confidence)

def log_action(self, tool: str, args: dict, reason: str):

self.log('action', tool=tool, args=args, reason=reason)

def log_observation(self, result: dict, truncated: bool = False):

self.log('observation',

result=result,

truncated=truncated,

result_size=len(json.dumps(result)))

def log_decision(self, decision: str, alternatives: list, reasoning: str):

self.log('decision',

decision=decision,

alternatives=alternatives,

reasoning=reasoning)

关键设计:**每个 action 都记录 reason**。不是"调了 search 工具",而是"调了 search 工具因为用户提到了'最近订单'需要从数据库查"。这样 Agent 出错时你能看到它当时的决策逻辑,而不是只看到调用结果。

写在最后

Agent 落地这事,技术方案不缺,框架一堆一堆的。真正缺的是工程意识——把它当成一个需要监控、需要刹车、需要 fallback 的生产系统来对待,而不是一个会自动完成任务的魔法盒。

我们团队现在搞 Agent 的原则就三条:

  • **能不调 LLM 就不调**(成本、延迟、不确定性)
  • 2. **所有副作用操作必须经过审批**(不管 Agent 多"聪明")

    3. **日志比功能重要**(出了事你得知道发生了什么)

    希望这些血泪教训能帮你少踩几个坑。如果你也在搞 Agent 落地,欢迎交流,我还有一肚子槽没吐完。