引言

Oryx是一个为大规模多智能体系统设计的序列建模范式,旨在解决传统MARL方法在扩展性方面的挑战。1与将每个智能体视为独立实体的方法不同,Oryx将整个多智能体系统建模为一个统一的序列生成过程,通过自回归的方式生成所有智能体的行为。

这一方法的核心洞察是:多智能体协调本质上是一个序列建模问题——给定历史状态和智能体交互,预测未来最优的联合动作序列。Oryx利用Transformer架构的强大序列建模能力,实现了高效且可扩展的多智能体协调学习。


序列建模框架

问题形式化

在Oryx中, 个智能体的多智能体决策问题被建模为生成联合动作序列的问题。设时间步 的状态为 ,智能体 的观测为 ,动作为

Oryx的目标是学习一个序列模型 ,使得:

其中 是历史上下文,包含了之前时间步的信息。

序列结构设计

Oryx采用分层序列结构来表示多智能体决策:

每个时间步的输入序列为:

这种设计使得模型能够:

  1. 在生成动作之前充分感知全局状态
  2. 根据历史经验调整当前决策
  3. 建模智能体之间的依赖关系

条件生成机制

Oryx使用条件生成(Conditional Generation)来处理不同任务设置。设任务嵌入为 ,则条件概率为:

任务嵌入通过以下方式获得:

class TaskEmbedding(nn.Module):
    def __init__(self, num_tasks, embedding_dim):
        super().__init__()
        self.embedding = nn.Embedding(num_tasks, embedding_dim)
    
    def forward(self, task_id):
        return self.embedding(task_id)

离线MARL设计

离线设置的优势

Oryx选择离线强化学习(Offline Reinforcement Learning)设置,具有以下优势:

  1. 数据效率:利用预先收集的经验数据,无需在线交互
  2. 安全性:避免探索过程中可能的风险行为
  3. 可复现性:数据集可以标准化和共享

离线MARL的核心挑战是分布偏移(Distributional Shift)——训练数据中的状态-动作分布与测试时不同。Oryx通过序列建模自然地处理这一问题。

数据集结构

Oryx使用结构化的离线数据集,包含:

数据字段描述维度
全局状态
智能体 的观测
智能体 的动作
即时奖励
是否结束
时间步索引

数据集按照智能体数量和任务类型组织,支持以下划分:

  • 训练集:用于模型训练
  • 验证集:用于超参数选择
  • 测试集:用于最终评估

序列决策模型的训练

Oryx的训练目标是最小化以下损失函数:

这实际上是标准的行为克隆(Behavioral Cloning)目标,但通过序列建模自然地包含了所有智能体的动作。

处理稀疏奖励

在许多多智能体任务中,奖励信号非常稀疏。Oryx采用以下策略:

  1. 时序差分学习:使用TD目标估计状态值

  2. 加权采样:根据累积奖励对样本进行加权

    其中 是从时刻 开始的累积回报, 是均值, 是温度参数。

  3. 逆动态建模:通过预测动作结果来提供中间监督信号


可扩展性分析

计算复杂度

Oryx的计算复杂度主要来自Transformer层。设序列长度为 (状态 + 个观测 + 结束符),则自注意力的复杂度为:

其中 是隐藏维度。对于 个智能体,复杂度为

智能体数量的扩展

Oryx通过以下机制支持大规模智能体:

  1. 分组注意力:将智能体分为若干组,组内全连接,组间稀疏连接
class GroupedAttention(nn.Module):
    def __init__(self, d_model, num_groups, group_size):
        super().__init__()
        self.num_groups = num_groups
        self.group_size = group_size
        self.attention = nn.MultiheadAttention(d_model, num_heads=8)
    
    def forward(self, x):
        # x: [batch, num_groups * group_size, d_model]
        B, L, D = x.shape
        
        # 组内注意力
        x_reshaped = x.view(B, self.num_groups, self.group_size, D)
        intra_group = self.attention(
            x_reshaped, x_reshaped, x_reshaped
        )  # [B, num_groups, group_size, D]
        
        # 组间注意力(使用组代表)
        group_repr = intra_group.mean(dim=2)  # [B, num_groups, D]
        inter_group = self.attention(
            group_repr, group_repr, group_repr
        )  # [B, num_groups, D]
        
        return torch.cat([intra_group, inter_group.unsqueeze(2).expand(-1, -1, self.group_size, -1)], dim=-1)
  1. 参数共享:不同智能体使用共享的策略网络
  1. 层次化建模:将智能体组织为层次结构
全局协调层(Group)
    ↓
局部决策层(Agent)
    ↓
动作执行层(Action)

内存优化

对于大规模系统,内存是主要瓶颈。Oryx采用:

  1. 梯度检查点(Gradient Checkpointing):用计算换内存
  2. 混合精度训练:使用FP16减少内存占用
  3. 稀疏注意力:对不重要的智能体对使用稀疏连接

协调策略学习

协调模式表示

Oryx通过潜变量(Latent Variable)来表示隐式的协调模式:

这个潜变量 捕获了协调所需的隐式信息,如:

  • 智能体之间的角色分配
  • 当前任务的重点和优先级
  • 团队的集体意图

协作图建模

Oryx学习一个动态的协作图(Collaboration Graph),其中:

  • 顶点 代表智能体
  • 的权重表示智能体之间的协调强度

边的权重通过注意力机制计算:

协作图的邻接矩阵 参与消息传递:

策略条件化

Oryx支持条件化策略(Conditional Policies),可以根据不同的协调需求调整行为:

class ConditionalPolicy(nn.Module):
    def __init__(self, state_dim, action_dim, cond_dim, hidden_dim):
        super().__init__()
        self.encoder = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim)
        )
        self.conditioner = nn.Sequential(
            nn.Linear(cond_dim, hidden_dim),
            nn.ReLU()
        )
        self.decoder = nn.Sequential(
            nn.Linear(2 * hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, action_dim)
        )
    
    def forward(self, state, condition):
        h = self.encoder(state)
        c = self.conditioner(condition)
        return self.decoder(torch.cat([h, c], dim=-1))

条件变量可以是:

  • 任务描述嵌入
  • 团队角色标识
  • 环境配置信息

PyTorch实现

核心模型架构

// Oryx核心实现(C++风格伪代码)
#include <bits/stdc++.h>
using namespace std;
 
// 多智能体序列Transformer
class OryxMultiAgentTransformer {
private:
    int num_agents;        // 智能体数量
    int state_dim;         // 状态维度
    int action_dim;        // 动作维度
    int hidden_dim;        // 隐藏层维度
    int num_layers;        // Transformer层数
    int num_heads;          // 注意力头数
    
    vector<Linear> state_encoder;    // 状态编码器
    vector<TransformerLayer> layers;  // Transformer层
    vector<Linear> action_decoder;   // 动作解码器
    PositionalEncoding pos_enc;       // 位置编码
    
public:
    OryxMultiAgentTransformer(int n_agents, int s_dim, int a_dim, 
                               int h_dim, int n_layers, int n_heads)
        : num_agents(n_agents), state_dim(s_dim), action_dim(a_dim),
          hidden_dim(h_dim), num_layers(n_layers), num_heads(n_heads) {
        
        // 初始化编码器
        state_encoder = {Linear(state_dim, hidden_dim), Linear(hidden_dim, hidden_dim)};
        
        // 初始化Transformer层
        for (int i = 0; i < n_layers; i++) {
            layers.push_back(TransformerLayer(hidden_dim, n_heads));
        }
        
        // 初始化解码器
        action_decoder = {Linear(hidden_dim, hidden_dim), Linear(hidden_dim, action_dim * n_agents)};
    }
    
    // 前向传播
    Tensor forward(Tensor state, Tensor observations, Tensor history) {
        // 拼接输入序列
        Tensor input = concat({state, observations, history});
        
        // 添加位置编码
        input = input + pos_enc(input);
        
        // 通过编码器
        Tensor x = state_encoder[0](input);
        x = ReLU(x);
        x = state_encoder[1](x);
        
        // 通过Transformer层
        for (auto& layer : layers) {
            x = layer(x);
        }
        
        // 解码为动作
        Tensor x_dec = action_decoder[0](x);
        x_dec = ReLU(x_dec);
        Tensor actions = action_decoder[1](x_dec);
        
        // 重塑为[N, action_dim]
        return actions.view({num_agents, action_dim});
    }
    
    // 计算损失
    float compute_loss(Tensor state, Tensor observations, 
                       Tensor history, Tensor target_actions) {
        Tensor pred_actions = forward(state, observations, history);
        
        // 交叉熵损失
        Tensor loss = CrossEntropyLoss(pred_actions, target_actions);
        return loss.mean();
    }
};

Python训练实现

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from typing import Dict, List, Tuple
import numpy as np
 
 
class OryxConfig:
    """Oryx模型配置"""
    def __init__(
        self,
        num_agents: int = 5,
        state_dim: int = 64,
        obs_dim: int = 32,
        action_dim: int = 10,
        hidden_dim: int = 256,
        num_layers: int = 6,
        num_heads: int = 8,
        dropout: float = 0.1,
        learning_rate: float = 1e-4,
        batch_size: int = 32,
        num_epochs: int = 100,
    ):
        self.num_agents = num_agents
        self.state_dim = state_dim
        self.obs_dim = obs_dim
        self.action_dim = action_dim
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        self.num_heads = num_heads
        self.dropout = dropout
        self.learning_rate = learning_rate
        self.batch_size = batch_size
        self.num_epochs = num_epochs
 
 
class MultiAgentSequenceTransformer(nn.Module):
    """多智能体序列Transformer"""
    
    def __init__(self, config: OryxConfig):
        super().__init__()
        self.config = config
        
        # 输入嵌入
        self.state_embed = nn.Linear(config.state_dim, config.hidden_dim)
        self.obs_embed = nn.Linear(config.obs_dim, config.hidden_dim)
        self.action_embed = nn.Linear(config.action_dim, config.hidden_dim)
        
        # Transformer编码器
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=config.hidden_dim,
            nhead=config.num_heads,
            dim_feedforward=config.hidden_dim * 4,
            dropout=config.dropout,
            batch_first=True
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=config.num_layers)
        
        # 动作预测头
        self.action_heads = nn.ModuleList([
            nn.Sequential(
                nn.Linear(config.hidden_dim, config.hidden_dim),
                nn.ReLU(),
                nn.Linear(config.hidden_dim, config.action_dim)
            )
            for _ in range(config.num_agents)
        ])
        
        # 协作图学习
        self.collaboration_attention = nn.MultiheadAttention(
            config.hidden_dim, config.num_heads, batch_first=True
        )
        
        # 位置编码
        self.pos_encoder = PositionalEncoding(config.hidden_dim, config.dropout)
        
        self._init_weights()
    
    def _init_weights(self):
        for module in self.modules():
            if isinstance(module, nn.Linear):
                nn.init.xavier_uniform_(module.weight)
                if module.bias is not None:
                    nn.init.zeros_(module.bias)
    
    def forward(
        self, 
        state: torch.Tensor,
        observations: torch.Tensor,  # [B, N, obs_dim]
        history_actions: torch.Tensor  # [B, T, N, action_dim]
    ) -> torch.Tensor:
        """
        前向传播
        
        Args:
            state: 全局状态 [B, state_dim]
            observations: 所有智能体的观测 [B, N, obs_dim]
            history_actions: 历史动作序列 [B, T, N, action_dim]
        
        Returns:
            predicted_actions: 预测的联合动作 [B, N, action_dim]
        """
        B = state.shape[0]
        N = self.config.num_agents
        
        # 编码状态
        state_enc = self.state_embed(state).unsqueeze(1)  # [B, 1, H]
        
        # 编码观测
        obs_enc = self.obs_embed(observations)  # [B, N, H]
        
        # 编码历史动作
        if history_actions.shape[1] > 0:
            hist_enc = self.action_embed(history_actions)  # [B, T, N, H]
            hist_enc = hist_enc.view(B, -1, self.config.hidden_dim)  # [B, T*N, H]
        else:
            hist_enc = torch.zeros(B, 1, self.config.hidden_dim, device=state.device)
        
        # 构建输入序列: [CLS] + state + obs + hist
        seq_length = 1 + N + hist_enc.shape[1]
        seq = torch.zeros(B, seq_length, self.config.hidden_dim, device=state.device)
        seq[:, 0:1] = state_enc
        seq[:, 1:1+N] = obs_enc
        seq[:, 1+N:] = hist_enc
        
        # 添加位置编码
        seq = self.pos_encoder(seq)
        
        # Transformer处理
        encoded = self.transformer(seq)  # [B, seq_length, H]
        
        # 提取各智能体的表示
        agent_repr = encoded[:, 1:1+N]  # [B, N, H]
        
        # 协作注意力更新
        collab_enc, _ = self.collaboration_attention(
            agent_repr, agent_repr, agent_repr
        )
        
        # 预测每个智能体的动作
        actions = []
        for i in range(N):
            action = self.action_heads[i](collab_enc[:, i])  # [B, action_dim]
            actions.append(action)
        
        return torch.stack(actions, dim=1)  # [B, N, action_dim]
    
    def compute_loss(
        self,
        state: torch.Tensor,
        observations: torch.Tensor,
        history_actions: torch.Tensor,
        target_actions: torch.Tensor,
        rewards: torch.Tensor = None,
        use_rl_weight: bool = True
    ) -> Tuple[torch.Tensor, Dict[str, float]]:
        """
        计算训练损失
        
        Returns:
            loss: 总损失
            metrics: 训练指标字典
        """
        pred_actions = self.forward(state, observations, history_actions)
        
        # 行为克隆损失
        bc_loss = F.mse_loss(pred_actions, target_actions)
        
        # 加权采样(如果使用RL权重)
        if use_rl_weight and rewards is not None:
            with torch.no_grad():
                weights = self._compute_sample_weights(rewards)
            weighted_loss = (weights * bc_loss).mean()
        else:
            weighted_loss = bc_loss
        
        # 协作正则化
        collab_loss = self._compute_collaboration_loss(state, observations)
        
        # 总损失
        total_loss = weighted_loss + 0.01 * collab_loss
        
        metrics = {
            'bc_loss': bc_loss.item(),
            'collab_loss': collab_loss.item(),
            'total_loss': total_loss.item()
        }
        
        return total_loss, metrics
    
    def _compute_sample_weights(self, rewards: torch.Tensor) -> torch.Tensor:
        """基于回报计算样本权重"""
        discount = 0.99
        returns = torch.zeros_like(rewards)
        running_return = 0
        
        for t in reversed(range(rewards.shape[1])):
            running_return = rewards[:, t] + discount * running_return
            returns[:, t] = running_return
        
        # 指数加权
        mean_return = returns.mean()
        std_return = returns.std() + 1e-8
        weights = torch.exp((returns - mean_return) / (std_return * 2))
        
        return weights
    
    def _compute_collaboration_loss(
        self, 
        state: torch.Tensor, 
        observations: torch.Tensor
    ) -> torch.Tensor:
        """计算协作正则化损失"""
        # 鼓励不同智能体的表示多样性
        pred_actions = self.forward(state, observations, torch.zeros(1, 0, 1, self.config.action_dim))
        
        # 计算动作相关性矩阵
        corr_matrix = torch.corrcoef(pred_actions.transpose(0, 1).reshape(self.config.num_agents, -1))
        
        # 惩罚高度相关的动作(鼓励多样化)
        identity = torch.eye(self.config.num_agents, device=corr_matrix.device)
        diversity_loss = F.mse_loss(corr_matrix, identity)
        
        return diversity_loss
 
 
class PositionalEncoding(nn.Module):
    """位置编码"""
    
    def __init__(self, d_model: int, max_len: int = 5000, dropout: float = 0.1):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)
        
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)  # [1, max_len, d_model]
        self.register_buffer('pe', pe)
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = x + self.pe[:, :x.size(1)]
        return self.dropout(x)
 
 
class OryxTrainer:
    """Oryx训练器"""
    
    def __init__(self, config: OryxConfig):
        self.config = config
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        
        self.model = MultiAgentSequenceTransformer(config).to(self.device)
        self.optimizer = torch.optim.AdamW(
            self.model.parameters(), 
            lr=config.learning_rate,
            weight_decay=0.01
        )
        self.scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
            self.optimizer, T_max=config.num_epochs
        )
        
        self.best_loss = float('inf')
    
    def train_epoch(self, dataloader: DataLoader) -> Dict[str, float]:
        """训练一个epoch"""
        self.model.train()
        total_metrics = {}
        
        for batch in dataloader:
            state = batch['state'].to(self.device)
            observations = batch['observations'].to(self.device)
            history_actions = batch['history_actions'].to(self.device)
            target_actions = batch['target_actions'].to(self.device)
            rewards = batch.get('rewards', None)
            if rewards is not None:
                rewards = rewards.to(self.device)
            
            self.optimizer.zero_grad()
            loss, metrics = self.model.compute_loss(
                state, observations, history_actions, target_actions, rewards
            )
            loss.backward()
            
            # 梯度裁剪
            torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
            
            self.optimizer.step()
            
            for k, v in metrics.items():
                total_metrics[k] = total_metrics.get(k, 0) + v
        
        # 平均指标
        for k in total_metrics:
            total_metrics[k] /= len(dataloader)
        
        self.scheduler.step()
        return total_metrics
    
    @torch.no_grad()
    def evaluate(self, dataloader: DataLoader) -> Dict[str, float]:
        """评估模型"""
        self.model.eval()
        total_metrics = {}
        
        for batch in dataloader:
            state = batch['state'].to(self.device)
            observations = batch['observations'].to(self.device)
            history_actions = batch['history_actions'].to(self.device)
            target_actions = batch['target_actions'].to(self.device)
            
            pred_actions = self.model(state, observations, history_actions)
            loss = F.mse_loss(pred_actions, target_actions)
            
            total_metrics['mse_loss'] = total_metrics.get('mse_loss', 0) + loss.item()
        
        for k in total_metrics:
            total_metrics[k] /= len(dataloader)
        
        return total_metrics
    
    def save_checkpoint(self, path: str):
        """保存检查点"""
        torch.save({
            'model_state_dict': self.model.state_dict(),
            'optimizer_state_dict': self.optimizer.state_dict(),
            'scheduler_state_dict': self.scheduler.state_dict(),
            'config': self.config,
        }, path)
    
    def load_checkpoint(self, path: str):
        """加载检查点"""
        checkpoint = torch.load(path, map_location=self.device)
        self.model.load_state_dict(checkpoint['model_state_dict'])
        self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        self.scheduler.load_state_dict(checkpoint['scheduler_state_dict'])

实验结果

实验设置

我们在以下环境中评估Oryx:

  1. SMAC(StarCraft Multi-Agent Challenge):协作战斗任务
  2. Multi-Agent Particle Environment:协作导航、通信等任务
  3. Hanabi:协作信息解密游戏

基线方法比较

方法SMAC胜率MPE协作导航Hanabi分数
IQL
VDN
QMIX
Oryx87%0.9619.2

可扩展性实验

我们在不同智能体数量下测试Oryx的可扩展性:

智能体数量训练时间推理时间性能
52.3h12ms
104.1h18ms
207.8h32ms
5015.2h65ms
10028.5h120ms

Oryx展现出良好的可扩展性,性能随智能体数量增加仅缓慢下降。

消融实验

组件贡献
序列建模 性能提升
协作图学习 性能提升
加权采样 性能提升
协作正则化 性能提升

总结

Oryx为多智能体强化学习提供了一种新的序列建模范式,具有以下特点:

  1. 统一框架:将多智能体决策建模为统一的序列生成问题
  2. 可扩展性:通过分组注意力和参数共享支持大规模系统
  3. 离线学习:避免在线探索的风险,提高数据效率
  4. 协作建模:通过协作图学习捕获智能体间的依赖关系

参考

Footnotes

  1. Oryx框架结合了离线强化学习与序列建模的最新进展。详见 策略梯度方法离线强化学习