1. 引言

联邦学习通过将数据保留在本地来提供隐私保护,但仅依靠”数据不出本地”并不能完全保证隐私安全。攻击者可以通过分析模型更新来推断敏感信息。本节介绍如何将差分隐私(Differential Privacy, DP)引入联邦学习,提供可证明的隐私保护。


2. 联邦学习中的隐私威胁

2.1 梯度反演攻击

即使只上传模型更新,攻击者仍可能重建原始数据:

**Zhu等人(2019)**证明,对于卷积网络,可以100%恢复训练图像:

# 梯度反演攻击示意
def gradient_inversion_attack(gradients, model_arch):
    """
    从梯度重建原始数据
    """
    x = 初始化随机图像()
    for iteration in range(max_iterations):
        # 前向传播
        output = model(x)
        loss = 计算损失(output)
        
        # 反向传播得到虚拟梯度
        virtual_grad = torch.autograd.grad(loss, x)
        
        # 匹配真实梯度
        if 梯度相似度(virtual_grad, gradients) > threshold:
            return x
    
    return x  # 重建的图像

2.2 成员推断攻击

攻击者判断某样本是否参与了训练:

研究表明,深度学习模型的成员推断攻击成功率可达60-70%

2.3 模型反演攻击

攻击者从模型参数中恢复训练数据的分布信息:

  • 标签重建:推断训练集中的标签分布
  • 特征重建:重建某些特征的统计特性
  • 样本重建:直接重建特定样本

3. 差分隐私基础

3.1 差分隐私的形式化定义

定义(-差分隐私):一个随机化机制 满足 -差分隐私,如果对于任意相邻数据集 和任意输出集合

其中相邻数据集定义为只相差一个样本的数据集。

3.2 隐私参数解释

参数含义典型取值
隐私预算2, 8, 256
失败概率,
-DP松弛定义组合使用

3.3 敏感性分析

定义(全局敏感性):对于函数 ,其全局敏感性为:

对于梯度下降:

  • 每个样本的梯度敏感性为 是损失函数的Lipschitz常数)
  • 批量梯度敏感性为 是批量大小)

4. 联邦学习中的差分隐私

4.1 本地差分隐私(Local DP)

每个客户端在上传前对数据进行扰动:

def local_dp_gradient(gradient, sensitivity, epsilon, delta):
    """
    本地差分隐私梯度扰动
    """
    # 计算高斯噪声标准差
    sigma = sensitivity * np.sqrt(2 * np.log(1.25 / delta)) / epsilon
    
    # 添加高斯噪声
    perturbed_gradient = gradient + torch.randn_like(gradient) * sigma
    
    return perturbed_gradient

优点:提供强隐私保证,无需可信服务器
缺点:累积噪声较大,效用损失严重

4.2 中心差分隐私(Central DP)

可信服务器对聚合后的结果进行扰动:

def central_dp_aggregate(gradients, sensitivity, epsilon, delta):
    """
    中心差分隐私聚合
    """
    # 1. 聚合梯度
    aggregated = sum(gradients) / len(gradients)
    
    # 2. 添加噪声
    sigma = sensitivity * np.sqrt(2 * np.log(1.25 / delta)) / epsilon
    perturbed = aggregated + torch.randn_like(aggregated) * sigma
    
    return perturbed

优点:噪声较小,效用更高
缺点:需要服务器可信

4.3 随机响应(Randomized Response)

用于离散数据场景:


5. DP-FedAvg算法

5.1 算法流程

def DP_FedAvg(K, T, C, E, η, ε, δ):
    """
    差分隐私联邦平均算法
    
    Args:
        K: 客户端数量
        T: 通信轮次
        C: 参与比例
        E: 本地epoch
        η: 学习率
        ε: 隐私预算
        δ: 失败概率
    """
    w_0 = 初始化模型()
    
    # 隐私会计
    accountant = PrivacyAccountant(ε, δ)
    
    for t in range(T):
        S_t = 选择客户端(K, C)
        
        # 每轮的隐私预算
        per_round_eps = ε / T
        
        for k in S_t:
            # 本地训练
            w_k = 本地训练(w_t, D_k, E, η)
            
            # 裁剪梯度
            grad = w_k - w_t
            grad_norm = torch.norm(grad)
            grad = grad * min(1, C_max / grad_norm)
            
            # 添加高斯噪声
            sigma = C_max * np.sqrt(2 * np.log(1.25/δ)) / per_round_eps
            grad_noisy = grad + torch.randn_like(grad) * sigma
            
            上传(grad_noisy)
        
        # 聚合
        w_t = w_t + Σ_{k∈S_t} (n_k/n) * grad_noisy_k
        
        # 更新隐私会计
        accountant.update(per_round_eps, δ)
        
        if accountant.spent_budget() > ε:
            print("隐私预算耗尽")
            break
    
    return w_T

5.2 梯度裁剪

为什么需要裁剪?

  • 梯度的敏感性取决于梯度范数
  • 无裁剪时,异常梯度可能导致敏感性无限大

裁剪操作

其中 是裁剪阈值。

5.3 隐私会计(Privacy Accounting)

高级矩阵会计师(Advanced Matrix Accountant)

class PrivacyAccountant:
    def __init__(self, target_eps, target_delta):
        self.target_eps = target_eps
        self.target_delta = target_delta
        self.noise_multiplier = []
        self.sample_rates = []
    
    def step(self, noise_multiplier, sample_rate):
        """
        记录一轮的隐私参数
        """
        self.noise_multiplier.append(noise_multiplier)
        self.sample_rates.append(sample_rate)
    
    def spent_budget(self):
        """
        使用 RDP + 转换 计算已消耗的隐私预算
        """
        # Renyi Differential Privacy (RDP) 计算
        rdp = sum([
            rdp_gaussian(q, sigma, 2) 
            for q, sigma in zip(self.sample_rates, self.noise_multiplier)
        ])
        
        # 转换为 (ε, δ)-DP
        eps = rdp_to_eps(rdp, self.target_delta)
        return eps
    
    def remaining_budget(self):
        """
        计算剩余隐私预算
        """
        return self.target_eps - self.spent_budget()

6. DP-SGD的隐私分析

6.1 Renyi差分隐私(RDP)

RDP提供更紧的隐私界限:

定义(RDP):一个机制 满足 -RDP,如果对于任意相邻数据集

其中 是Renyi散度。

6.2 RDP的组合性质

定理(RDP组合):RDP在顺序组合下满足:

6.3 RDP到(ε,δ)-DP的转换

给定 -RDP,可以转换为标准差分隐私:


7. 实践中的隐私-效用权衡

7.1 影响隐私效用的因素

因素对隐私的影响对效用的影响
噪声尺度 增大 → 更隐私增大 → 效用下降
裁剪阈值 减小 → 更隐私减小 → 效用下降
参与比例 减小 → 更隐私减小 → 效用下降
通信轮数 增多 → 隐私减少增多 → 效用提升

7.2 超参数选择指南

def tune_dp_hyperparameters(target_eps, target_delta, T, n_clients):
    """
    选择DP-FL超参数
    """
    # 隐私预算分配
    per_round_eps = target_eps / T
    
    # 噪声尺度选择
    for C in [0.1, 0.5, 1.0, 2.0, 5.0]:
        sigma = C * np.sqrt(2 * np.log(1.25/target_delta)) / per_round_eps
        
        # 估算效用损失
        utility_loss = estimate_utility_loss(C, sigma, T)
        
        if utility_loss < threshold:
            return C, sigma
    
    return None, None

7.3 实验观察

在CIFAR-10上的实验结果:

方法隐私预算 ε准确率
无DP85.2%
DP-FedAvgε=872.5%
DP-FedAvgε=1678.3%
DP-FedAvgε=25683.1%

8. 代码实现

8.1 完整DP-FedAvg实现

import torch
import torch.nn as nn
import numpy as np
from typing import List, Dict
import copy
 
class DPFedAvg:
    def __init__(
        self,
        model_fn,
        clients_data: List[torch.utils.data.Dataset],
        privacy_config: Dict,
        device: str = 'cpu'
    ):
        """
        Args:
            privacy_config: {
                'target_eps': float,      # 目标隐私预算
                'target_delta': float,    # 目标失败概率
                'max_grad_norm': float,   # 梯度裁剪阈值
            }
        """
        self.target_eps = privacy_config['target_eps']
        self.target_delta = privacy_config['target_delta']
        self.max_grad_norm = privacy_config['max_grad_norm']
        
        self.device = device
        self.global_model = model_fn().to(device)
        self.clients_data = clients_data
        self.n_clients = len(clients_data)
        
        # 隐私会计
        self.privacy_accountant = PrivacyAccountant(
            self.target_eps, self.target_delta
        )
    
    def clip_gradient(self, gradient: Dict) -> Dict:
        """梯度裁剪"""
        total_norm = torch.sqrt(sum(
            torch.sum(g ** 2) for g in gradient.values()
        ))
        
        clip_factor = self.max_grad_norm / (total_norm + 1e-6)
        clip_factor = min(1.0, clip_factor)
        
        return {k: g * clip_factor for k, g in gradient.items()}
    
    def add_noise(self, gradient: Dict, sigma: float) -> Dict:
        """添加高斯噪声"""
        return {
            k: g + torch.randn_like(g) * sigma
            for k, g in gradient.items()
        }
    
    def client_update(
        self, 
        client_id: int, 
        E: int, 
        lr: float,
        noise_multiplier: float
    ) -> Dict:
        """带DP的客户端更新"""
        local_model = copy.deepcopy(self.global_model).to(self.device)
        dataloader = torch.utils.data.DataLoader(
            self.clients_data[client_id],
            batch_size=32,
            shuffle=True
        )
        
        optimizer = torch.optim.SGD(local_model.parameters(), lr=lr)
        criterion = nn.CrossEntropyLoss()
        
        # 本地训练
        for epoch in range(E):
            for x, y in dataloader:
                x, y = x.to(self.device), y.to(self.device)
                optimizer.zero_grad()
                loss = criterion(local_model(x), y)
                loss.backward()
                optimizer.step()
        
        # 计算梯度更新
        gradient = {
            k: v - self.global_model.state_dict()[k]
            for k, v in local_model.state_dict().items()
        }
        
        # 裁剪
        clipped_gradient = self.clip_gradient(gradient)
        
        # 添加噪声
        sigma = self.max_grad_norm * noise_multiplier
        noisy_gradient = self.add_noise(clipped_gradient, sigma)
        
        return {
            'gradient': noisy_gradient,
            'n_samples': len(self.clients_data[client_id])
        }
    
    def aggregate(self, gradients: List[Dict]):
        """聚合扰动后的梯度"""
        total_samples = sum(g['n_samples'] for g in gradients)
        
        with torch.no_grad():
            for k in self.global_model.state_dict().keys():
                aggregated = sum(
                    g['gradient'][k] * g['n_samples'] / total_samples
                    for g in gradients
                )
                self.global_model.state_dict()[k] += aggregated
    
    def fit(
        self, 
        num_rounds: int, 
        participation_ratio: float,
        local_epochs: int,
        lr: float,
        noise_multiplier: float
    ):
        """联邦学习训练"""
        n_participants = max(1, int(self.n_clients * participation_ratio))
        
        for round_idx in range(num_rounds):
            participants = np.random.choice(
                self.n_clients, n_participants, replace=False
            )
            
            # 客户端更新
            gradients = []
            for cid in participants:
                grad = self.client_update(cid, local_epochs, lr, noise_multiplier)
                gradients.append(grad)
            
            # 聚合
            self.aggregate(gradients)
            
            # 更新隐私会计
            self.privacy_accountant.step(
                noise_multiplier, 
                participation_ratio
            )
            
            # 检查隐私预算
            current_eps = self.privacy_accountant.spent_budget()
            if current_eps > self.target_eps:
                print(f"隐私预算耗尽: {current_eps:.2f} > {self.target_eps}")
                break
            
            if (round_idx + 1) % 10 == 0:
                print(f"Round {round_idx + 1}, 已用隐私预算 ε={current_eps:.2f}")
 
 
class PrivacyAccountant:
    """Renyi差分隐私会计"""
    
    @staticmethod
    def rdp_gaussian(q, sigma, alpha):
        """计算高斯机制的RDP"""
        if q == 0:
            return 0
        return 0.5 * alpha * np.log(
            1 + (alpha * q**2 * sigma**2) / (alpha - 1)
        ) if alpha > 1 else float('inf')
    
    @staticmethod
    def rdp_to_eps(rdp, delta):
        """RDP转换为(ε,δ)-DP"""
        if rdp == 0:
            return 0
        return rdp + np.log(1/delta) / (rdp - 1) * (rdp - 1)
    
    def __init__(self, target_eps, target_delta):
        self.target_eps = target_eps
        self.target_delta = target_delta
        self.rdp_sum = 0
        self.num_steps = 0
    
    def step(self, noise_multiplier, sample_rate):
        """记录一步"""
        # 更新RDP
        self.rdp_sum += self.rdp_gaussian(sample_rate, noise_multiplier, 2)
        self.num_steps += 1
    
    def spent_budget(self):
        """计算已消耗的ε"""
        return self.rdp_to_eps(self.rdp_sum, self.target_delta)
    
    def remaining_budget(self):
        """剩余隐私预算"""
        return self.target_eps - self.spent_budget()

9. 参考文献


10. 相关主题