Convex Markov Games:凸偏好下的统一多智能体决策框架

1. 引言

多智能体强化学习(MARL)的核心挑战之一是如何定义和找到合理的均衡。传统方法通常假设智能体最大化期望累积奖励,但这种设定在许多实际场景中存在局限:

  • 公平性需求:多个智能体可能偏好不同的奖励分配方案
  • 安全约束:策略需要满足概率或确定性安全约束
  • 风险敏感:智能体可能对风险有不同偏好(如CVaR)
  • 多目标优化:需要同时优化多个可能冲突的目标

Gemp等人提出的Convex Markov Games (cMG) 框架1通过引入凸偏好(Convex Preferences)概念,为这些挑战提供了统一而优雅的解决方案。

2. 从经典Markov Games到cMG

2.1 经典Markov Games回顾

定义 2.1(Markov Game / Stochastic Game)
: 一个 人随机博弈(Markov Game)由元组 定义,其中:

  • :智能体集合
  • :联合状态空间
  • :智能体 的动作空间
  • :转移概率函数
  • :智能体 的即时奖励
  • :折扣因子

标准Nash均衡目标

2.2 cMG的核心洞察

cMG的核心洞察是:将每个智能体的偏好从标量奖励扩展为凸函数

定义 2.2(cMG中的智能体)
: 在cMG中,每个智能体 由以下组件定义:

  1. 偏好集 维偏好空间
  2. 线性效用函数 :将偏好映射为实数
  3. 累积分布函数 :状态-动作对到偏好的映射

2.3 形式化定义

定义 2.3(Convex Markov Game)1
: 一个凸马尔可夫博弈由元组 定义,其中:

  • :初始状态分布
  • :智能体 的偏好空间(凸集)
  • :在状态 执行联合动作 时,智能体 获得的偏好分布
  • 凸目标函数(偏好分布到实数的映射)

2.4 经典设置的统一

cMG框架可以统一以下经典设置:

经典设置cMG表示
标准MDP
Risk-sensitive MDP (CVaR)
安全MDP
Fair MDP(多智能体)

3. 凸偏好形式化

3.1 凸偏好的直觉

为什么需要凸性?

考虑一个多智能体场景:

  • 智能体A偏好高风险高回报
  • 智能体B偏好稳定但较低回报

传统方法无法表达这种非理性偏好,而cMG允许:

其中 是策略 对应的occupancy measure, 是奖励向量, 是内积。

3.2 Occupancy Measure框架

定义 3.1(Occupancy Measure)
: 给定策略 和转移核 ,occupancy measure 定义为:

引理 3.1(策略-Occupancy对应)
: 策略 和 occupancy measure 之间存在双射关系,当且仅当 满足流动性约束:

3.3 凸目标函数

定义 3.2(凸目标函数)
: 函数 是凸的,当对任意分布

常见凸目标函数

class ConvexObjective:
    """常见的凸目标函数"""
    
    @staticmethod
    def expectation(dist):
        """期望效用(线性)"""
        return sum(p * u for p, u in zip(dist.probs, dist.support))
    
    @staticmethod
    def cvar(alpha, dist):
        """CVaR (Conditional Value at Risk)"""
        sorted_outcomes = sorted(zip(dist.support, dist.probs))
        # 计算VaR
        cumsum = 0
        var = 0
        for u, p in sorted_outcomes:
            if cumsum + p >= alpha:
                var = u
                break
            cumsum += p
        # CVaR = E[X | X <= VaR]
        cvar = sum(u * p for u, p in sorted_outcomes if u <= var)
        return cvar
    
    @staticmethod
    def shannon_entropy(dist):
        """信息熵(凸 - 负熵为凹)"""
        return -sum(p * torch.log(p + 1e-8) for p in dist.probs)
    
    @staticmethod
    def wasserstein_barycenter(target_dist, dist):
        """Wasserstein重心(常用于公平性)"""
        # 简化版本
        return torch.norm(target_dist - dist, p=2)

3.4 统一目标形式

定理 3.1(cMG统一目标)1
: 在cMG框架下,智能体 的目标是:

其中 是所有平稳策略的集合。

4. 均衡存在性证明

4.1 均衡定义

定义 4.1(cMG-Nash均衡)
: 联合策略 是cMG的Nash均衡,当对每个智能体

其中 对应的occupancy measure。

4.2 主要存在性定理

定理 4.1(Nash均衡存在性)1
: 有限状态空间、有限动作空间的cMG至少存在一个Nash均衡。

证明概要

  1. 策略空间紧致性 是紧致凸集
  2. 目标函数凹性:对每个智能体, 的凹函数
  3. Debreu-Glicksberg-Fan定理:紧致凸集上的连续凹-凸博弈必有Nash均衡

4.3 关键引理

引理 4.1(策略到Occupancy的双射)
: 映射 是从 到流动性约束可行集 的双射。

引理 4.2(目标函数的表示不变性)
: 若 是凸的且线性表示不变(linearly representation invariant),则:

其中 是策略空间上的凸函数。

4.4 证明细节

def prove_nash_existence(cmg):
    """
    证明cMG中Nash均衡的存在性
    
    核心思路:使用Debreu-Glicksberg-Fan定理
    """
    # 1. 策略空间紧致性
    strategy_spaces = [
        ProbabilitySimplex(dim=cmg.action_dim_i)
        for i in cmg.n_agents
    ]
    product_space = ProductSpace(*strategy_spaces)  # 紧致
    
    # 2. 每个智能体的最优反应映射是上半连续的
    def best_response_map(pi_minus_i):
        """计算智能体i对其他智能体策略的最优反应"""
        def objective(pi_i):
            occupancy = compute_occupancy(pi_i, pi_minus_i, cmg)
            return -cmg.phi_i(occupancy)  # 最大化等价于最小化负
        
        return projected_gradient_descent(objective, ...)
    
    # 3. Kakutani不动点定理应用
    # 联合最优反应映射 Γ: Π → 2^Π
    def joint_br_mapping(pi):
        """联合最优反应(每个智能体独立)"""
        return CartesianProduct([
            best_response_map(i, pi_minus_i)
            for i in range(cmg.n_agents)
        ])
    
    # 4. 验证不动点定理条件
    # - 策略空间非空紧致凸 ✓
    # - 联合反应映射非空值、上半连续、凸图 ✓
    # → 存在不动点,即Nash均衡

5. 与Markov Decision Process的统一

5.1 MDP作为cMG的特殊情况

定理 5.1(MDP → cMG)
: 每个MDP可以表示为一个单智能体cMG,其中:

  • (标量奖励)
  • (确定性偏好)
  • (恒等函数)

5.2 逆向:cMG → 广义MDP

定理 5.2(cMG → 广义MDP)
: cMG可以视为一个广义MDP,其状态为 ,其中 是智能体 的累积偏好分布。

5.3 统一的算法框架

class UnifiedCMGAlgorithm:
    """
    统一的cMG算法框架
    
    可以实例化为:
    - 标准RL (phi = expectation)
    - Risk-sensitive RL (phi = CVaR)
    - Fair RL (phi = Gini coefficient)
    """
    
    def __init__(self, cmg, phi_type='expectation', **phi_params):
        self.cmg = cmg
        self.phi = self._build_phi(phi_type, **phi_params)
        
        # 统一的网络结构
        self.policy_net = nn.ModuleDict({
            f'agent_{i}': ActorCritic(state_dim, action_dim_i)
            for i in range(cmg.n_agents)
        })
        
        # Occupancy估计器
        self.occupancy_estimator = OccupancyEstimator(cmg)
    
    def _build_phi(self, phi_type, **params):
        """构建凸目标函数"""
        if phi_type == 'expectation':
            return lambda dist: torch.sum(dist * params.get('weights', 1.0))
        
        elif phi_type == 'cvar':
            alpha = params.get('alpha', 0.95)
            return lambda dist: ConvexObjective.cvar(alpha, dist)
        
        elif phi_type == 'entropy':
            return lambda dist: ConvexObjective.shannon_entropy(dist)
        
        elif phi_type == 'fairness':
            # 基于Wasserstein的公平性度量
            target = params.get('target_distribution')
            return lambda dist: -ConvexObjective.wasserstein_barycenter(target, dist)
    
    def update(self, batch):
        """统一的更新逻辑"""
        # 1. 估计occupancy measure
        rho = self.occupancy_estimator.estimate(batch)
        
        # 2. 计算偏好分布
        preference_dist = self._compute_preference_distribution(batch)
        
        # 3. 应用凸目标函数
        convex_value = self.phi(preference_dist)
        
        # 4. 策略梯度更新
        for i in range(self.cmg.n_agents):
            policy_loss = -convex_value
            self.optimizers[i].zero_grad()
            policy_loss.backward()
            self.optimizers[i].step()
        
        return convex_value.item()

6. 学习算法

6.1 Independent Policy Gradient方法

算法 6.1(cMG-IPG)
: 独立策略梯度方法(Independent Policy Gradient for cMG)

核心思想:每个智能体独立更新自己的策略,同时考虑其他智能体的策略变化。

class CMGIndependentPolicyGradient:
    """
    cMG独立策略梯度算法
    
    每个智能体独立执行策略梯度,同时维护对其他智能体的估计
    """
    
    def __init__(self, n_agents, state_dim, action_dims, gamma=0.99):
        self.n_agents = n_agents
        self.gamma = gamma
        
        # 每个智能体的策略和价值网络
        self.policies = nn.ModuleList([
            Actor(state_dim, action_dims[i])
            for i in range(n_agents)
        ])
        
        self.critics = nn.ModuleList([
            Critic(state_dim, action_dims, hidden=128)
            for _ in range(n_agents)
        ])
        
        self.optimizers = [
            torch.optim.Adam(policy.parameters(), lr=3e-4)
            for policy in self.policies
        ]
        
        self.critic_optimizers = [
            torch.optim.Adam(critic.parameters(), lr=1e-3)
            for critic in self.critics
        ]
    
    def compute_preference_features(self, states, actions):
        """
        计算偏好特征
        
        这是cMG的关键:将状态-动作对映射到偏好空间
        """
        features = []
        for i in range(self.n_agents):
            state_action = torch.cat([
                states[:, i] if states.dim() > 1 else states,
                actions[:, i].float()
            ], dim=-1)
            
            # 学习偏好嵌入
            pref = self.preference_encoder[i](state_action)
            features.append(pref)
        
        return torch.stack(features, dim=1)  # [B, N, d]
    
    def compute_convex_objective(self, preferences):
        """
        计算凸目标函数
        
        支持多种凸目标:期望、CVaR、Shannon熵等
        """
        # 将偏好转换为分布
        preference_dist = F.softmax(preferences, dim=-1)
        
        # 默认:线性期望
        utility = (preferences * preference_dist).sum(dim=-1)
        
        return utility
    
    def update_agent(self, agent_id, batch):
        """
        更新单个智能体的策略
        """
        states = batch['states']
        actions = batch['actions']
        rewards = batch['rewards']
        next_states = batch['next_states']
        dones = batch['dones']
        
        # ===== Critic更新 =====
        with torch.no_grad():
            next_prefs = self.compute_preference_features(
                next_states, 
                self._sample_actions(next_states)
            )
            next_values = self.compute_convex_objective(next_prefs)
            
            target = rewards[:, agent_id] + self.gamma * next_values * (1 - dones)
        
        current_prefs = self.compute_preference_features(states, actions)
        current_values = self.compute_convex_objective(current_prefs)
        
        critic_loss = F.mse_loss(current_values, target)
        
        self.critic_optimizers[agent_id].zero_grad()
        critic_loss.backward()
        self.critic_optimizers[agent_id].step()
        
        # ===== Actor更新 =====
        # 重新计算偏好(包含梯度)
        prefs = self.compute_preference_features(states, actions)
        
        # 最大化凸目标
        objective = self.compute_convex_objective(prefs)
        
        # 策略梯度
        log_probs = self.policies[agent_id].get_log_prob(
            states[:, agent_id] if states.dim() > 1 else states,
            actions[:, agent_id]
        )
        
        policy_loss = -(log_probs * objective.detach()).mean()
        
        self.optimizers[agent_id].zero_grad()
        policy_loss.backward()
        self.optimizers[agent_id].step()
        
        return {
            'critic_loss': critic_loss.item(),
            'policy_loss': policy_loss.item(),
            'objective': objective.mean().item()
        }
    
    def _sample_actions(self, states):
        """从当前策略采样动作"""
        actions = []
        for i in range(self.n_agents):
            logits = self.policies[i](
                states[:, i] if states.dim() > 1 else states
            )
            probs = F.softmax(logits, dim=-1)
            action = torch.multinomial(probs, 1).squeeze(-1)
            actions.append(action)
        return torch.stack(actions, dim=1)

6.2 收敛性分析

定理 6.1(cMG-IPG收敛性)1
: 在cMG-IPG中,若满足以下条件:

  1. 步长序列 满足
  2. 偏好特征映射是 -Lipschitz的
  3. 凸目标函数 -光滑的

则策略序列 以概率1收敛到cMG的Nash均衡。

6.3 扩展:集中式 Critic + 去中心化 Actor

class CMGCTDE:
    """
    cMG with Centralized Training, Decentralized Execution
    
    集中式训练阶段可以访问全局状态和所有动作,
    去中心化执行阶段只使用局部观察。
    """
    
    def __init__(self, n_agents, state_dim, action_dims, global_state_dim):
        # 去中心化Actor
        self.actors = nn.ModuleList([
            DecentralizedActor(state_dim, action_dims[i])
            for i in range(n_agents)
        ])
        
        # 中心化Critic(可访问全局信息)
        self.critic = CentralizedCritic(global_state_dim, action_dims)
        
        # 偏好编码器(集中式)
        self.preference_encoder = nn.Sequential(
            nn.Linear(global_state_dim + sum(action_dims), 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, n_agents * preference_dim)  # 每个智能体一个偏好向量
        )
    
    def forward(self, global_state, local_obs, agent_id):
        """
        前向传播
        """
        # Actor使用局部观察
        action_logits = self.actors[agent_id](local_obs)
        
        # Critic使用全局状态
        q_value = self.critic(global_state)
        
        return action_logits, q_value
    
    def update(self, batch):
        """
        集中式更新
        """
        global_states = batch['global_states']
        local_obs = batch['local_obs']
        actions = batch['actions']
        rewards = batch['rewards']
        next_global_states = batch['next_global_states']
        
        # ===== 计算偏好 =====
        preference_features = self.preference_encoder(
            torch.cat([global_states, actions], dim=-1)
        )
        preference_features = preference_features.view(-1, self.n_agents, -1)
        
        # ===== 凸目标 =====
        convex_values = self.compute_convex_objective(preference_features)
        
        # ===== Critic更新 =====
        with torch.no_grad():
            next_prefs = self.preference_encoder(
                torch.cat([next_global_states, self._sample_actions(next_global_states)], dim=-1)
            ).view(-1, self.n_agents, -1)
            next_values = self.compute_convex_objective(next_prefs)
            target = rewards.sum(dim=1, keepdim=True) + self.gamma * next_values
        
        critic_loss = F.mse_loss(convex_values, target)
        
        # ===== Actor更新 =====
        for i in range(self.n_agents):
            actor_loss = -(convex_values[:, i:i+1].mean())
            
            self.actor_optimizers[i].zero_grad()
            actor_loss.backward(retain_graph=(i < self.n_agents - 1))
            self.actor_optimizers[i].step()

7. 应用场景

7.1 模仿学习

在模仿学习中,cMG框架可以优雅地处理多模态demonstrations

class CMGImitationLearning:
    """
    基于cMG框架的模仿学习
    
    处理多模态专家演示
    """
    
    def __init__(self, expert_trajectories, preference_dim=8):
        self.expert_trajectories = expert_trajectories
        
        # 学习专家的隐式偏好
        self.preference_encoder = PreferenceEncoder(
            state_dim, action_dim, latent_dim=preference_dim
        )
        
        # 凸目标:最大化与专家偏好的相似度
        self.phi = self._build_causal_entropic_Phi()
    
    def _build_causal_entropic_Phi(self):
        """
        因果熵凸目标
        
        鼓励学习与专家因果结构一致的策略
        """
        def causal_entropic_objective(preference_dist):
            # 因果熵:考虑动作之间的因果关系
            causal_entropy = self._compute_causal_entropy(preference_dist)
            
            # 期望奖励
            expected_reward = (preference_dist * self.reward_weights).sum()
            
            # 凸组合
            return expected_reward + 0.1 * causal_entropy
        
        return causal_entropic_objective
    
    def update(self, batch):
        """模仿学习更新"""
        # 估计occupancy measure
        rho = self._estimate_occupancy(batch)
        
        # 计算专家偏好分布
        expert_prefs = self._compute_expert_preferences(self.expert_trajectories)
        
        # 学习隐式偏好
        learned_prefs = self.preference_encoder(batch['states'], batch['actions'])
        
        # 最大化与专家偏好的相似度
        similarity = F.cosine_similarity(learned_prefs, expert_prefs)
        
        return -similarity.mean()

7.2 公平性约束

cMG可以自然地处理公平性约束

class CMGFairness:
    """
    基于cMG的公平多智能体学习
    """
    
    def __init__(self, fairness_type='egalitarian'):
        self.fairness_type = fairness_type
    
    def compute_fairness_objective(self, preferences, rewards):
        """
        计算公平性目标
        """
        if self.fairness_type == 'egalitarian':
            # 平等主义:最大化最小收益
            return preferences.min(dim=-1)[0]
        
        elif self.fairness_type == 'proportional':
            # 比例公平:最大化对数和
            return torch.log(preferences + 1e-8).sum(dim=-1)
        
        elif self.fairness_type == ' utilitarian':
            # 功利主义:最大化总收益
            return preferences.sum(dim=-1)
        
        elif self.fairness_type == 'rao':
            # 帕累托效率 + 公平性
            gini = self._compute_gini_coefficient(preferences)
            utilitarian = preferences.sum(dim=-1)
            return utilitarian - 0.1 * gini
    
    def _compute_gini_coefficient(self, x):
        """计算基尼系数"""
        sorted_x, _ = torch.sort(x, dim=-1)
        n = x.size(-1)
        cumsum = torch.cumsum(sorted_x, dim=-1)
        gini = (2 * torch.arange(1, n + 1, device=x.device).float() - n - 1) * cumsum / (n * cumsum[:, -1:])
        return gini.mean(dim=-1)

7.3 安全约束

class CMGSafeRL:
    """
    基于cMG的安全强化学习
    
    使用凸CVaR约束风险
    """
    
    def __init__(self, risk_level=0.95):
        self.risk_level = risk_level
    
    def safe_convex_objective(self, preferences, rewards, costs):
        """
        安全凸目标:最大化期望收益,同时约束CVaR(成本)
        """
        # 收益目标(最大化)
        reward_objective = preferences.mean(dim=-1)
        
        # 成本CVaR(约束)
        cost_cvar = self._compute_cvar(costs, self.risk_level)
        
        # Lagrangian形式
        lambda_safe = 0.1  # 安全权重
        constraint_violation = F.relu(cost_cvar - self.cost_limit)
        
        return reward_objective - lambda_safe * constraint_violation
    
    def _compute_cvar(self, costs, alpha):
        """计算条件在险价值 (CVaR)"""
        sorted_costs, _ = torch.sort(costs, dim=-1)
        
        # VaR
        var_idx = int(alpha * costs.size(-1))
        var = sorted_costs[:, var_idx]
        
        # CVaR
        cvar = (sorted_costs * (sorted_costs <= var.unsqueeze(-1)).float()).sum(dim=-1) / (sorted_costs <= var.unsqueeze(-1)).float().sum(dim=-1)
        
        return cvar

8. 与相关工作的比较

8.1 统一性比较

框架目标均衡偏好表达
cMG凸函数Nash任意凸偏好
Nash-VFI线性Nash仅期望
Mean Field Games线性MFG均衡仅期望
Potential Games势函数Nash势函数形式
Stochastic Games线性Nash/Coarse CE仅期望

8.2 算法复杂度

算法时间复杂度空间复杂度收敛速率
cMG-IPG
cMG-CTDE
Nash-VFI
Fictitious Play

8.3 扩展性

cMG的优势

  1. 模块化:目标函数可以灵活替换
  2. 凸优化工具:可利用成熟的凸优化算法
  3. 理论保障:凸分析提供坚实的理论基础

9. 总结与展望

9.1 主要贡献

  1. 统一框架:将多种MARL设置统一到凸偏好下
  2. 灵活偏好:支持期望、CVaR、公平性等多种目标
  3. 理论保证:提供均衡存在性和收敛性证明
  4. 实用算法:IPG和CTDE两种学习范式

9.2 未来方向

  • 与LLM结合:将语言模型作为偏好推理器
  • 层次化cMG:多尺度凸偏好建模
  • 在线cMG:处理非平稳环境和分布偏移
  • 鲁棒cMG:对抗性扰动下的鲁棒均衡

9.3 相关资源

参考文献

Footnotes

  1. Gemp et al. “Convex Markov Games: A Unifying Multi-Agent Decision Framework” ICML 2025 2 3 4 5