Agentic AI(代理式人工智能)

概述

Agentic AI(代理式人工智能)是指能够自主追求目标、执行多步骤工作流的 AI 系统。与简单的问答式 Chatbot 不同,Agentic AI 能够:

  • 感知环境:通过工具获取外部信息
  • 推理决策:基于上下文制定行动计划
  • 执行动作:调用工具完成具体任务
  • 观察反馈:根据执行结果调整下一步行动

2026 年,64% 的 CIO 计划在 24 个月内部署代理式 AI(来源:Gartner 2026 CIO Agenda)

核心概念

Agentic AI vs 传统 Chatbot

特性传统 ChatbotAgentic AI
交互模式单轮问答多轮推理执行
工具使用主动调用工具
自主程度高(目标导向)
适用场景FAQ、客服复杂任务自动化

Agent Loop(智能体循环)

┌─────────────────────────────────────────────────────┐
│                      Agent Loop                      │
├─────────────────────────────────────────────────────┤
│                                                      │
│   ┌─────────┐    ┌──────────┐    ┌─────────┐       │
│   │Perceive │───▶│  Reason  │───▶│  Act    │       │
│   │  (感知) │    │  (推理)  │    │ (执行)  │       │
│   └─────────┘    └──────────┘    └─────────┘       │
│        ▲              │              │               │
│        │              │              ▼               │
│        │         ┌──────────┐  ┌─────────┐          │
│        └─────────│Observe   │◀─│Result   │          │
│                  │(观察反馈)│  │(结果)   │          │
│                  └──────────┘  └─────────┘          │
└─────────────────────────────────────────────────────┘

循环终止条件

  • 达成目标
  • 达到最大迭代次数
  • 检测到不可恢复的错误

架构模式

1. ReAct(Reasoning + Acting)

将推理和执行交织进行,适合需要灵活调整策略的任务:

def react_agent(query: str, tools: list[Tool]) -> str:
    thought = "我需要分析这个问题..."
    action = select_tool(thought, tools)
    observation = execute(action)
    
    while not finished:
        thought = reason(observation, query)
        if "answer" in thought:
            return extract_answer(thought)
        action = select_tool(thought, tools)
        observation = execute(action)

2. Plan-and-Execute

先生成完整计划,再按顺序执行,适合可预测的任务:

def plan_and_execute(query: str, tools: list[Tool]) -> str:
    # 1. 规划阶段
    plan = planner.create_plan(query)  # ["步骤1", "步骤2", ...]
    
    # 2. 执行阶段
    for step in plan:
        result = execute_step(step, tools)
        if not validate(result):
            plan = replan(query, result)  # 重新规划
            break
    
    return aggregate_results(results)

3. Supervisor(多智能体协作)

一个协调者智能体将任务分配给专业子智能体:

┌──────────────────────────────────────────────┐
│              Supervisor Agent                │
│  ┌────────────────────────────────────────┐ │
│  │  任务:分析代码库并生成文档             │ │
│  └────────────────────────────────────────┘ │
│                    │                         │
│         ┌──────────┼──────────┐             │
│         ▼          ▼          ▼             │
│   ┌───────────┐ ┌───────────┐ ┌───────────┐ │
│   │  Code     │ │  API      │ │  Doc      │ │
│   │  Reader   │ │  Analyzer │ │  Writer   │ │
│   │  Agent    │ │  Agent    │ │  Agent    │ │
│   └───────────┘ └───────────┘ └───────────┘ │
└──────────────────────────────────────────────┘

工具调用

工具定义架构

from typing import Annotated, Any
from pydantic import BaseModel, Field
 
class SearchTool(BaseModel):
    """网络搜索工具"""
    query: str = Field(description="搜索查询词")
    max_results: int = Field(default=5, description="最大结果数")
 
class CalculatorTool(BaseModel):
    """计算器工具"""
    expression: str = Field(description="数学表达式")
 
# 工具 Schema 用于 LLM 函数调用
tools = [
    {
        "type": "function",
        "function": {
            "name": "search",
            "description": "搜索互联网获取最新信息",
            "parameters": SearchTool.model_json_schema()
        }
    },
    {
        "type": "function", 
        "function": {
            "name": "calculate",
            "description": "执行数学计算",
            "parameters": CalculatorTool.model_json_schema()
        }
    }
]

工具调用执行器

import json
from typing import Callable
 
class ToolExecutor:
    def __init__(self):
        self.tools: dict[str, Callable] = {}
    
    def register(self, name: str, func: Callable):
        self.tools[name] = func
    
    async def execute(self, tool_call: dict) -> Any:
        name = tool_call["function"]["name"]
        args = json.loads(tool_call["function"]["arguments"])
        
        if name not in self.tools:
            return {"error": f"Unknown tool: {name}"}
        
        try:
            result = await self.tools[name](**args)
            return result
        except Exception as e:
            return {"error": str(e)}

顺序执行 vs 并行执行

# 顺序执行:结果影响下一步决策
async def sequential_execution(steps: list[ToolCall]) -> list[Result]:
    results = []
    for step in steps:
        result = await executor.execute(step)
        results.append(result)
        if is_critical_error(result):
            break  # 提前终止
    return results
 
# 并行执行:独立任务加速完成
async def parallel_execution(steps: list[ToolCall]) -> list[Result]:
    tasks = [executor.execute(step) for step in steps]
    return await asyncio.gather(*tasks)

记忆系统

短期记忆(Conversation Memory)

class ConversationMemory:
    def __init__(self, max_turns: int = 10):
        self.messages: list[Message] = []
        self.max_turns = max_turns
    
    def add(self, role: str, content: str):
        self.messages.append(Message(role=role, content=content))
        # 滑动窗口,保持最近 N 轮对话
        if len(self.messages) > self.max_turns:
            self.messages = self.messages[-self.max_turns:]
    
    def get_context(self) -> list[dict]:
        return [{"role": m.role, "content": m.content} 
                for m in self.messages]

长期记忆(Semantic Memory)

class SemanticMemory:
    def __init__(self, vector_db: VectorDB):
        self.vector_db = vector_db
        self.collection = "long_term_memory"
    
    async def store(self, experience: str, metadata: dict):
        embedding = await embed_model.encode(experience)
        self.vector_db.upsert(
            collection=self.collection,
            points=[{
                "id": str(uuid.uuid4()),
                "vector": embedding,
                "payload": {"content": experience, **metadata}
            }]
        )
    
    async def recall(self, query: str, top_k: int = 5) -> list[dict]:
        query_embedding = await embed_model.encode(query)
        results = self.vector_db.search(
            collection=self.collection,
            vector=query_embedding,
            limit=top_k
        )
        return [r["payload"] for r in results]

多智能体系统

智能体协作模式

class MultiAgentOrchestrator:
    def __init__(self):
        self.agents: dict[str, Agent] = {}
        self.registered_tools: dict[str, Tool] = {}
    
    def register(self, name: str, agent: Agent, tools: list[Tool]):
        self.agents[name] = agent
        for tool in tools:
            self.registered_tools[f"{name}.{tool.name}"] = tool
    
    async def run_task(self, task: str, agent_roles: list[str]) -> str:
        # 1. 任务分解
        subtasks = await self.decompose(task, agent_roles)
        
        # 2. 并行执行独立子任务
        coroutines = [self.agents[role].execute(subtask) 
                      for role, subtask in subtasks.items()]
        results = await asyncio.gather(*coroutines)
        
        # 3. 结果聚合
        return self.aggregate(results)

通信协议

from enum import Enum
 
class MessageType(Enum):
    TASK = "task"           # 分配任务
    RESULT = "result"       # 返回结果
    ERROR = "error"         # 错误报告
    QUERY = "query"         # 查询信息
    RESPONSE = "response"   # 响应信息
 
@dataclass
class AgentMessage:
    from_agent: str
    to_agent: str
    type: MessageType
    content: Any
    conversation_id: str
    timestamp: datetime

生产实践

评估框架

from dataclasses import dataclass
 
@dataclass
class AgentEvaluation:
    task_success: float      # 任务成功率
    avg_steps: float         # 平均步数
    tool_usage_accuracy: float  # 工具使用准确率
    response_time_ms: float  # 响应时间
    cost_usd: float          # 成本
 
# 评估指标
def evaluate_agent(agent: Agent, tasks: list[Task]) -> AgentEvaluation:
    successes = 0
    total_steps = 0
    correct_tools = 0
    total_tools = 0
    
    for task in tasks:
        trace = agent.run(task)
        successes += trace.success
        total_steps += len(trace.steps)
        
        for step in trace.steps:
            total_tools += 1
            if step.tool == step.expected_tool:
                correct_tools += 1
    
    return AgentEvaluation(
        task_success=successes / len(tasks),
        avg_steps=total_steps / len(tasks),
        tool_usage_accuracy=correct_tools / total_tools,
        response_time_ms=trace.total_time_ms,
        cost_usd=trace.total_cost
    )

安全护栏(Guardrails)

class SafetyGuardrails:
    def __init__(self):
        self.allowed_tools: set[str] = set()
        self.deny_patterns: list[re.Pattern] = []
        self.max_iterations: int = 20
    
    def check_tool(self, tool_name: str) -> bool:
        if tool_name not in self.allowed_tools:
            return False
        return True
    
    def check_content(self, content: str) -> bool:
        for pattern in self.deny_patterns:
            if pattern.search(content):
                return False
        return True
    
    def check_iterations(self, count: int) -> bool:
        return count < self.max_iterations
 
# 使用示例
guardrails = SafetyGuardrails()
guardrails.allowed_tools = {"search", "calculate", "read_file", "write_file"}
guardrails.deny_patterns = [re.compile(r"rm\s+-rf\s+/"), re.compile(r"DROP\s+TABLE")]

成本管理

class CostTracker:
    def __init__(self, budget_usd: float):
        self.budget = budget_usd
        self.spent = 0.0
        self.requests = 0
    
    def estimate_cost(self, model: str, input_tokens: int, 
                     output_tokens: int) -> float:
        pricing = {
            "gpt-4o": (0.005, 0.015),   # ($/1K input, $/1K output)
            "claude-3-5-sonnet": (0.003, 0.015),
            "gpt-4o-mini": (0.00015, 0.0006),
        }
        if model not in pricing:
            return 0.0
        input_cost = input_tokens / 1000 * pricing[model][0]
        output_cost = output_tokens / 1000 * pricing[model][1]
        return input_cost + output_cost
    
    def check_budget(self, estimated_cost: float) -> bool:
        if self.spent + estimated_cost > self.budget:
            raise BudgetExceededError(f"Budget exceeded: ${self.spent:.2f}")
        return True

最佳实践

1. 工具设计原则

原则说明示例
单一职责每个工具做一件事search_web 而非 search_and_summarize
清晰命名动宾短语get_user_orders 而非 user_orders
完整描述说明输入输出和副作用文档字符串包含示例
幂等性多次调用结果一致get_current_time vs send_email

2. 错误处理策略

async def robust_execute(tool_call: ToolCall, 
                        max_retries: int = 3) -> Result:
    for attempt in range(max_retries):
        try:
            return await executor.execute(tool_call)
        except ToolUnavailableError:
            # 等待后重试(指数退避)
            await asyncio.sleep(2 ** attempt)
        except RateLimitError:
            # 切换到备用工具
            tool_call = fallback_tool(tool_call)
        except Exception as e:
            if attempt == max_retries - 1:
                return Result(success=False, error=str(e))
    
    return Result(success=False, error="Max retries exceeded")

3. 调试与可观测性

class AgentTracer:
    def __init__(self, service_name: str):
        self.logger = structured_logger(service_name)
    
    def trace_step(self, step: AgentStep):
        self.logger.info(
            "agent_step",
            step=step.number,
            tool=step.tool_used,
            input_tokens=step.input_tokens,
            output_tokens=step.output_tokens,
            latency_ms=step.latency_ms,
            reasoning=step.thought[:200]  # 截断推理过程
        )
    
    def trace_error(self, error: Exception, context: dict):
        self.logger.error(
            "agent_error",
            error_type=type(error).__name__,
            error_message=str(error),
            **context
        )

相关主题


参考资料