概述

多层感知机(MLP)是深度学习最基础也最重要的架构。本文档提供一份从零开始的完整实现指南,涵盖:

  1. NumPy从零实现:理解每个数学细节
  2. PyTorch完整实现:工业级训练流程
  3. 初始化方案:Xavier、He、LSUV、正交初始化
  4. 正则化技术:Dropout、BatchNorm、Weight Decay、LayerNorm
  5. 优化器:SGD、Adam、AdamW
  6. 学习率调度:Step、Cosine、Warmup
  7. 完整训练流水线:MNIST/Fashion-MNIST分类

MLP是所有深度学习架构的”原子”,深入理解其实现细节对理解CNN、RNN、Transformer至关重要。[^1]


一、MLP的数学基础

1.1 网络结构

层MLP的数学形式:

其中:

  • 是输入
  • 是第 层权重
  • 是偏置
  • 是第 层激活函数
  • 是输出
  • 是预测概率

1.2 损失函数

交叉熵损失(多分类):

均方误差(回归):

1.3 反向传播

使用链式法则,损失对参数的梯度:

其中 是误差信号。

递推关系


二、NumPy从零实现

2.1 完整实现

import numpy as np
 
 
class MLP:
    """NumPy从零实现的多层感知机"""
    
    def __init__(self, layer_dims, activations, init_method='he', 
                 dropout_rate=0.0, weight_decay=0.0):
        """
        参数:
            layer_dims: 每层维度,例如 [784, 256, 128, 10]
            activations: 激活函数列表,例如 ['relu', 'relu', 'softmax']
            init_method: 初始化方法
            dropout_rate: dropout概率
            weight_decay: L2正则化系数
        """
        self.num_layers = len(layer_dims) - 1
        self.layer_dims = layer_dims
        self.activations = activations
        self.dropout_rate = dropout_rate
        self.weight_decay = weight_decay
        
        # 初始化参数
        self.parameters = {}
        self._initialize_parameters(init_method)
        
        # 训练模式标志
        self.training = True
    
    def _initialize_parameters(self, method):
        """权重初始化"""
        for l in range(1, self.num_layers + 1):
            fan_in = self.layer_dims[l - 1]
            fan_out = self.layer_dims[l]
            
            if method == 'xavier':
                # Xavier/Glorot: Var(W) = 2 / (fan_in + fan_out)
                scale = np.sqrt(2.0 / (fan_in + fan_out))
            elif method == 'he':
                # He/Kaiming: Var(W) = 2 / fan_in (for ReLU)
                scale = np.sqrt(2.0 / fan_in)
            elif method == 'xavier_normal':
                scale = np.sqrt(1.0 / fan_in)
            else:
                scale = 0.01
            
            # 权重矩阵
            self.parameters[f'W{l}'] = np.random.randn(fan_in, fan_out) * scale
            self.parameters[f'b{l}'] = np.zeros((1, fan_out))
    
    def _activate(self, z, activation):
        """激活函数"""
        if activation == 'relu':
            return np.maximum(0, z)
        elif activation == 'sigmoid':
            return 1.0 / (1.0 + np.exp(-np.clip(z, -500, 500)))
        elif activation == 'tanh':
            return np.tanh(z)
        elif activation == 'softmax':
            z_shifted = z - np.max(z, axis=1, keepdims=True)
            exp_z = np.exp(z_shifted)
            return exp_z / np.sum(exp_z, axis=1, keepdims=True)
        elif activation == 'leaky_relu':
            return np.where(z > 0, z, 0.01 * z)
        elif activation == 'identity':
            return z
        else:
            raise ValueError(f"未知激活函数: {activation}")
    
    def _activate_derivative(self, z, activation):
        """激活函数导数"""
        if activation == 'relu':
            return (z > 0).astype(float)
        elif activation == 'sigmoid':
            s = self._activate(z, 'sigmoid')
            return s * (1 - s)
        elif activation == 'tanh':
            return 1 - np.tanh(z) ** 2
        elif activation == 'leaky_relu':
            return np.where(z > 0, 1.0, 0.01)
        elif activation in ('softmax', 'identity'):
            # softmax的导数与交叉熵组合计算
            return np.ones_like(z)
        else:
            raise ValueError(f"未知激活函数: {activation}")
    
    def forward(self, X):
        """前向传播"""
        self.cache = {'A0': X}
        self.dropout_masks = {}
        
        A = X
        for l in range(1, self.num_layers + 1):
            W = self.parameters[f'W{l}']
            b = self.parameters[f'b{l}']
            activation = self.activations[l - 1]
            
            # 线性变换
            Z = A @ W + b
            self.cache[f'Z{l}'] = Z
            
            # 激活
            A = self._activate(Z, activation)
            
            # Dropout(除输出层外)
            if self.training and self.dropout_rate > 0 and l < self.num_layers:
                mask = (np.random.rand(*A.shape) > self.dropout_rate).astype(float)
                A = A * mask / (1 - self.dropout_rate)
                self.dropout_masks[f'D{l}'] = mask
            
            self.cache[f'A{l}'] = A
        
        return A
    
    def compute_loss(self, Y_pred, Y_true):
        """计算损失"""
        m = Y_true.shape[0]
        
        if self.activations[-1] == 'softmax':
            # 交叉熵损失
            epsilon = 1e-15
            Y_pred_clipped = np.clip(Y_pred, epsilon, 1 - epsilon)
            loss = -np.mean(np.sum(Y_true * np.log(Y_pred_clipped), axis=1))
        else:
            # 均方误差
            loss = np.mean((Y_pred - Y_true) ** 2)
        
        # L2正则化
        if self.weight_decay > 0:
            reg_loss = 0
            for l in range(1, self.num_layers + 1):
                reg_loss += np.sum(self.parameters[f'W{l}'] ** 2)
            loss += 0.5 * self.weight_decay * reg_loss / m
        
        return loss
    
    def backward(self, Y_pred, Y_true):
        """反向传播"""
        m = Y_true.shape[0]
        gradients = {}
        
        # 输出层误差
        if self.activations[-1] == 'softmax' and Y_true.shape[1] > 1:
            # 交叉熵 + softmax的组合梯度
            dA = (Y_pred - Y_true) / m
        else:
            dA = 2 * (Y_pred - Y_true) / m
        
        for l in reversed(range(1, self.num_layers + 1)):
            W = self.parameters[f'W{l}']
            Z = self.cache[f'Z{l}']
            A_prev = self.cache[f'A{l-1}']
            activation = self.activations[l - 1]
            
            # Dropout mask
            if f'D{l}' in self.dropout_masks:
                dA = dA * self.dropout_masks[f'D{l}'] / (1 - self.dropout_rate)
            
            # 激活函数梯度
            if activation == 'softmax':
                # softmax梯度已与交叉熵组合
                dZ = dA
            else:
                dZ = dA * self._activate_derivative(Z, activation)
            
            # 参数梯度
            gradients[f'dW{l}'] = A_prev.T @ dZ + self.weight_decay * W / m
            gradients[f'db{l}'] = np.sum(dZ, axis=0, keepdims=True)
            
            # 传递到前一层
            dA = dZ @ W.T
        
        return gradients
    
    def update_parameters(self, gradients, learning_rate):
        """梯度下降更新"""
        for l in range(1, self.num_layers + 1):
            self.parameters[f'W{l}'] -= learning_rate * gradients[f'dW{l}']
            self.parameters[f'b{l}'] -= learning_rate * gradients[f'db{l}']
    
    def train(self, X, Y, X_val=None, Y_val=None, epochs=100, batch_size=32,
              learning_rate=0.001, verbose=True):
        """训练流程"""
        history = {'train_loss': [], 'val_loss': [], 'val_acc': []}
        n_samples = X.shape[0]
        
        for epoch in range(epochs):
            # Shuffle
            indices = np.random.permutation(n_samples)
            X_shuffled = X[indices]
            Y_shuffled = Y[indices]
            
            epoch_loss = 0
            n_batches = 0
            
            for i in range(0, n_samples, batch_size):
                X_batch = X_shuffled[i:i+batch_size]
                Y_batch = Y_shuffled[i:i+batch_size]
                
                # 前向传播
                Y_pred = self.forward(X_batch)
                
                # 计算损失
                loss = self.compute_loss(Y_pred, Y_batch)
                epoch_loss += loss
                n_batches += 1
                
                # 反向传播
                gradients = self.backward(Y_pred, Y_batch)
                
                # 更新参数
                self.update_parameters(gradients, learning_rate)
            
            avg_loss = epoch_loss / n_batches
            history['train_loss'].append(avg_loss)
            
            # 验证
            if X_val is not None:
                self.training = False
                Y_val_pred = self.forward(X_val)
                val_loss = self.compute_loss(Y_val_pred, Y_val)
                val_acc = np.mean(np.argmax(Y_val_pred, axis=1) == np.argmax(Y_val, axis=1))
                history['val_loss'].append(val_loss)
                history['val_acc'].append(val_acc)
                self.training = True
                
                if verbose and (epoch + 1) % 10 == 0:
                    print(f"Epoch {epoch+1}/{epochs} - "
                          f"train_loss: {avg_loss:.4f} - "
                          f"val_loss: {val_loss:.4f} - "
                          f"val_acc: {val_acc:.4f}")
            elif verbose and (epoch + 1) % 10 == 0:
                print(f"Epoch {epoch+1}/{epochs} - train_loss: {avg_loss:.4f}")
        
        return history
    
    def predict(self, X):
        """预测"""
        self.training = False
        Y_pred = self.forward(X)
        return np.argmax(Y_pred, axis=1)

2.2 MNIST训练测试

def load_mnist_simple():
    """简化的MNIST加载(需要torchvision)"""
    from torchvision import datasets, transforms
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,)),
        transforms.Lambda(lambda x: x.view(-1))  # 展平
    ])
    train_data = datasets.MNIST('./data', train=True, download=True, transform=transform)
    test_data = datasets.MNIST('./data', train=False, transform=transform)
    
    X_train = np.stack([d[0].numpy() for d in train_data])
    Y_train = np.eye(10)[np.array([d[1] for d in train_data])]
    X_test = np.stack([d[0].numpy() for d in test_data])
    Y_test = np.eye(10)[np.array([d[1] for d in test_data])]
    
    return X_train, Y_train, X_test, Y_test
 
 
# 使用NumPy实现的MLP
X_train, Y_train, X_test, Y_test = load_mnist_simple()
 
mlp = MLP(
    layer_dims=[784, 256, 128, 10],
    activations=['relu', 'relu', 'softmax'],
    init_method='he',
    dropout_rate=0.2,
    weight_decay=1e-4
)
 
history = mlp.train(
    X_train[:50000], Y_train[:50000],
    X_val=X_train[50000:], Y_val=Y_train[50000:],
    epochs=30,
    batch_size=64,
    learning_rate=0.001
)
 
# 测试
test_acc = np.mean(mlp.predict(X_test) == np.argmax(Y_test, axis=1))
print(f"测试集准确率: {test_acc:.4f}")

三、PyTorch完整实现

3.1 模块化MLP

import torch
import torch.nn as nn
import torch.nn.functional as F
 
 
class LinearBlock(nn.Module):
    """线性层 + 归一化 + 激活 + Dropout"""
    
    def __init__(self, in_features, out_features, activation='relu',
                 use_batchnorm=True, dropout_rate=0.0, init_method='he'):
        super().__init__()
        self.linear = nn.Linear(in_features, out_features)
        self.use_bn = use_batchnorm
        
        if use_batchnorm:
            self.bn = nn.BatchNorm1d(out_features)
        
        # 激活函数
        if activation == 'relu':
            self.activation = nn.ReLU(inplace=True)
        elif activation == 'leaky_relu':
            self.activation = nn.LeakyReLU(0.01, inplace=True)
        elif activation == 'gelu':
            self.activation = nn.GELU()
        elif activation == 'silu':
            self.activation = nn.SiLU()
        elif activation == 'tanh':
            self.activation = nn.Tanh()
        elif activation == 'identity':
            self.activation = nn.Identity()
        else:
            raise ValueError(f"未知激活: {activation}")
        
        # Dropout
        self.dropout = nn.Dropout(dropout_rate) if dropout_rate > 0 else nn.Identity()
        
        # 初始化
        self._init_weights(init_method, activation)
    
    def _init_weights(self, method, activation):
        """权重初始化"""
        if method == 'he':
            # He/Kaiming初始化
            if activation in ('relu', 'leaky_relu'):
                nn.init.kaiming_normal_(self.linear.weight, nonlinearity='relu')
            elif activation == 'gelu' or activation == 'silu':
                # GELU/SiLU类似ReLU,使用相同的初始化
                nn.init.kaiming_normal_(self.linear.weight, nonlinearity='relu')
            else:
                # Xavier初始化
                nn.init.xavier_normal_(self.linear.weight)
        elif method == 'xavier':
            nn.init.xavier_normal_(self.linear.weight)
        elif method == 'orthogonal':
            nn.init.orthogonal_(self.linear.weight, gain=1.0)
        elif method == 'lsuv':
            # LSUV需要在前向传播时执行
            self._lsuv = True
        
        # 偏置初始化为0
        if self.linear.bias is not None:
            nn.init.zeros_(self.linear.bias)
    
    def forward(self, x):
        out = self.linear(x)
        if self.use_bn:
            out = self.bn(out)
        out = self.activation(out)
        out = self.dropout(out)
        return out
 
 
class MLP(nn.Module):
    """完整的多层感知机"""
    
    def __init__(self, input_dim, hidden_dims, output_dim,
                 activation='relu', output_activation='identity',
                 use_batchnorm=True, dropout_rate=0.0,
                 init_method='he', weight_decay=0.0):
        super().__init__()
        self.input_dim = input_dim
        self.output_dim = output_dim
        self.weight_decay = weight_decay
        
        # 构造层
        layers = []
        prev_dim = input_dim
        
        for hidden_dim in hidden_dims:
            layers.append(LinearBlock(
                in_features=prev_dim,
                out_features=hidden_dim,
                activation=activation,
                use_batchnorm=use_batchnorm,
                dropout_rate=dropout_rate,
                init_method=init_method
            ))
            prev_dim = hidden_dim
        
        # 输出层
        output_layer = nn.Linear(prev_dim, output_dim)
        if init_method == 'he':
            nn.init.kaiming_normal_(output_layer.weight)
        else:
            nn.init.xavier_normal_(output_layer.weight)
        nn.init.zeros_(output_layer.bias)
        layers.append(output_layer)
        
        # 输出激活
        if output_activation == 'softmax':
            layers.append(nn.Softmax(dim=-1))
        elif output_activation == 'log_softmax':
            layers.append(nn.LogSoftmax(dim=-1))
        elif output_activation == 'sigmoid':
            layers.append(nn.Sigmoid())
        
        self.network = nn.Sequential(*layers)
    
    def forward(self, x):
        # 展平输入
        if x.dim() > 2:
            x = x.view(x.size(0), -1)
        return self.network(x)
 
 
# 示例
model = MLP(
    input_dim=784,
    hidden_dims=[512, 256, 128],
    output_dim=10,
    activation='relu',
    output_activation='log_softmax',
    use_batchnorm=True,
    dropout_rate=0.3,
    init_method='he'
)
print(model)

3.2 LSUV初始化

Layer-Sequential Unit-Variance (LSUV) 是Mishkin & Matas (2015)提出的实用初始化方法:

def lsuv_init(model, data_batch, target_std=1.0, target_mean=0.0, max_iter=10, tol=1e-3):
    """
    LSUV初始化:逐层调整权重使输出方差为1
    """
    model.eval()
    
    # 注册钩子获取每层输出
    outputs = {}
    handles = []
    
    def hook(name):
        def fn(module, input, output):
            outputs[name] = output.detach()
        return fn
    
    for name, module in model.named_modules():
        if isinstance(module, (nn.Conv2d, nn.Linear)):
            handles.append(module.register_forward_hook(hook(name)))
    
    # 前向传播
    with torch.no_grad():
        _ = model(data_batch)
    
    # 调整每层
    for name, module in model.named_modules():
        if isinstance(module, (nn.Conv2d, nn.Linear)) and name in outputs:
            out = outputs[name]
            # 去除BatchNorm等
            if out.dim() > 2:
                out = out.view(out.size(0), -1)
            
            for it in range(max_iter):
                current_std = out.std().item()
                current_mean = out.mean().item()
                
                # 调整权重缩放
                if abs(current_std - target_std) > tol:
                    module.weight.data *= (target_std / (current_std + 1e-8))
                
                # 调整偏置
                if abs(current_mean - target_mean) > tol:
                    if module.bias is not None:
                        module.bias.data += (target_mean - current_mean)
                
                # 重新前向
                _ = model(data_batch)
                out = outputs[name]
                if out.dim() > 2:
                    out = out.view(out.size(0), -1)
                
                if (abs(current_std - target_std) < tol and 
                    abs(current_mean - target_mean) < tol):
                    break
    
    # 清理钩子
    for handle in handles:
        handle.remove()
    
    model.train()

四、正则化技术详解

4.1 Dropout

原理:训练时随机将一部分神经元输出置零,推理时使用全部神经元(并缩放补偿)。

数学形式

训练时:

推理时:

\tilde{\mathbf{a}}^{(l)} = \mathbf{a}^{(l)} ``` **Inverted Dropout**(PyTorch使用):训练时除以 $(1-p)$,推理时无需缩放。 ```python class DropoutAnalysis: """Dropout分析""" @staticmethod def expected_value_test(): """验证Dropout的无偏性""" p = 0.5 x = torch.ones(10000, 100) mask = (torch.rand_like(x) > p).float() / (1 - p) output = (x * mask).mean() # 期望接近1 print(f"Dropout(p={p})输出均值: {output.item():.4f} (期望1)") @staticmethod def variance_test(): """Dropout对梯度方差的影响""" # 训练时梯度方差较大,提供正则化 # 推理时无方差,全局稳定 pass # 测试 DropoutAnalysis.expected_value_test() ``` ### 4.2 Batch Normalization **原理**:对每个mini-batch的特征做归一化,使其均值为0、方差为1,然后学习缩放和平移。 **训练时**:

\hat{x}_i = \frac{x_i - \mu_B}{\sqrt{\sigma_B^2 + \epsilon}}, \quad y_i = \gamma \hat{x}_i + \beta

**推理时**:使用移动平均的 $\mu, \sigma$:

\hat{x} = \frac{x - \mu_{\text{moving}}}{\sqrt{\sigma_{\text{moving}}^2 + \epsilon}}

**BatchNorm的优势**: 1. 允许更大学习率 2. 减少对初始化的依赖 3. 提供正则化效果 ```python class BatchNorm1dManual(nn.Module): """手动实现BatchNorm以理解其细节""" def __init__(self, num_features, eps=1e-5, momentum=0.1): super().__init__() self.eps = eps self.momentum = momentum self.gamma = nn.Parameter(torch.ones(num_features)) self.beta = nn.Parameter(torch.zeros(num_features)) self.register_buffer('running_mean', torch.zeros(num_features)) self.register_buffer('running_var', torch.ones(num_features)) def forward(self, x): if self.training: # 计算batch统计量 mean = x.mean(dim=0) var = x.var(dim=0, unbiased=False) # 更新移动平均 with torch.no_grad(): self.running_mean = (1 - self.momentum) * self.running_mean + self.momentum * mean self.running_var = (1 - self.momentum) * self.running_var + self.momentum * var else: mean = self.running_mean var = self.running_var # 归一化 x_hat = (x - mean) / torch.sqrt(var + self.eps) return self.gamma * x_hat + self.beta ``` ### 4.3 Layer Normalization **与BatchNorm的区别**:在**特征维度**而非batch维度归一化。

\hat{x}_i = \frac{x_i - \mu_L}{\sqrt{\sigma_L^2 + \epsilon}}, \quad y_i = \gamma \hat{x}_i + \beta

其中 $\mu_L = \frac{1}{d} \sum_{j=1}^{d} x_{ij}$。 **优势**: - 对batch size不敏感 - 适用于变长序列(RNN/Transformer) - 训练和推理行为一致 ```python class LayerNormManual(nn.Module): """手动实现LayerNorm""" def __init__(self, normalized_shape, eps=1e-5): super().__init__() self.eps = eps self.gamma = nn.Parameter(torch.ones(normalized_shape)) self.beta = nn.Parameter(torch.zeros(normalized_shape)) def forward(self, x): mean = x.mean(dim=-1, keepdim=True) var = x.var(dim=-1, keepdim=True, unbiased=False) x_hat = (x - mean) / torch.sqrt(var + self.eps) return self.gamma * x_hat + self.beta ``` ### 4.4 Weight Decay **L2正则化**:

\mathcal{L}{\text{reg}} = \mathcal{L}{\text{data}} + \frac{\lambda}{2} |\mathbf{W}|

\frac{\partial \mathcal{L}{\text{reg}}}{\partial W} = \frac{\partial \mathcal{L}{\text{data}}}{\partial W} + \lambda W

W \leftarrow W - \eta \left( \nabla_W \mathcal{L} + \lambda W \right)

不同于Adam中 $W \leftarrow W - \eta \nabla_W \mathcal{L} - \eta \lambda W$(耦合)。 ### 4.5 数据增强 ```python from torchvision import transforms def get_train_transforms(): """训练数据增强""" return transforms.Compose([ transforms.RandomHorizontalFlip(), transforms.RandomCrop(28, padding=4), transforms.ColorJitter(brightness=0.2, contrast=0.2), transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,)), ]) def get_test_transforms(): """测试数据变换(无增强)""" return transforms.Compose([ transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,)), ]) ``` --- ## 五、优化器 ### 5.1 SGD with Momentum ```python class SGD: """SGD with Momentum""" def __init__(self, params, lr=0.01, momentum=0.9, weight_decay=0.0, nesterov=False): self.params = list(params) self.lr = lr self.momentum = momentum self.weight_decay = weight_decay self.nesterov = nesterov # 每个参数的动量缓冲 self.velocities = [torch.zeros_like(p) for p in self.params] def step(self): """参数更新""" for i, param in enumerate(self.params): if param.grad is None: continue grad = param.grad # Weight decay if self.weight_decay != 0: grad = grad + self.weight_decay * param # 动量更新 self.velocities[i] = self.momentum * self.velocities[i] + grad if self.nesterov: # Nesterov动量 update = self.momentum * self.velocities[i] + grad else: update = self.velocities[i] param.data -= self.lr * update def zero_grad(self): for param in self.params: if param.grad is not None: param.grad.zero_() # 等价PyTorch实现 optimizer = torch.optim.SGD( model.parameters(), lr=0.01, momentum=0.9, weight_decay=1e-4, nesterov=True ) ``` ### 5.2 Adam优化器 ```python class Adam: """Adam优化器""" def __init__(self, params, lr=1e-3, betas=(0.9, 0.999), eps=1e-8, weight_decay=0.0): self.params = list(params) self.lr = lr self.beta1, self.beta2 = betas self.eps = eps self.weight_decay = weight_decay # 状态 self.m = [torch.zeros_like(p) for p in self.params] # 一阶矩 self.v = [torch.zeros_like(p) for p in self.params] # 二阶矩 self.t = 0 # 时间步 def step(self): self.t += 1 for i, param in enumerate(self.params): if param.grad is None: continue grad = param.grad # Weight decay (L2) if self.weight_decay != 0: grad = grad + self.weight_decay * param # 更新一阶矩 self.m[i] = self.beta1 * self.m[i] + (1 - self.beta1) * grad # 更新二阶矩 self.v[i] = self.beta2 * self.v[i] + (1 - self.beta2) * grad ** 2 # 偏差修正 m_hat = self.m[i] / (1 - self.beta1 ** self.t) v_hat = self.v[i] / (1 - self.beta2 ** self.t) # 参数更新 param.data -= self.lr * m_hat / (torch.sqrt(v_hat) + self.eps) def zero_grad(self): for param in self.params: if param.grad is not None: param.grad.zero_() # 等价PyTorch实现 optimizer = torch.optim.Adam( model.parameters(), lr=1e-3, betas=(0.9, 0.999), weight_decay=1e-4 ) ``` ### 5.3 AdamW优化器(解耦Weight Decay) ```python class AdamW: """AdamW优化器(解耦Weight Decay)""" def __init__(self, params, lr=1e-3, betas=(0.9, 0.999), eps=1e-8, weight_decay=1e-2): self.params = list(params) self.lr = lr self.beta1, self.beta2 = betas self.eps = eps self.weight_decay = weight_decay self.m = [torch.zeros_like(p) for p in self.params] self.v = [torch.zeros_like(p) for p in self.params] self.t = 0 def step(self): self.t += 1 for i, param in enumerate(self.params): if param.grad is None: continue grad = param.grad # Adam更新(不含weight decay) self.m[i] = self.beta1 * self.m[i] + (1 - self.beta1) * grad self.v[i] = self.beta2 * self.v[i] + (1 - self.beta2) * grad ** 2 m_hat = self.m[i] / (1 - self.beta1 ** self.t) v_hat = self.v[i] / (1 - self.beta2 ** self.t) # 参数更新:Adam + 解耦Weight Decay param.data -= self.lr * (m_hat / (torch.sqrt(v_hat) + self.eps) + self.weight_decay * param) def zero_grad(self): for param in self.params: if param.grad is not None: param.grad.zero_() # 等价PyTorch实现 optimizer = torch.optim.AdamW( model.parameters(), lr=1e-3, weight_decay=0.01 ) ``` --- ## 六、学习率调度 ### 6.1 Step Decay ```python class StepLR: """阶梯式衰减""" def __init__(self, optimizer, step_size=30, gamma=0.1): self.optimizer = optimizer self.step_size = step_size self.gamma = gamma self.last_epoch = -1 def step(self): self.last_epoch += 1 if self.last_epoch > 0 and self.last_epoch % self.step_size == 0: for param_group in self.optimizer.param_groups: param_group['lr'] *= self.gamma ``` ### 6.2 Cosine Annealing ```python import math class CosineAnnealingLR: """余弦退火""" def __init__(self, optimizer, T_max, eta_min=0): self.optimizer = optimizer self.T_max = T_max self.eta_min = eta_min self.last_epoch = -1 def step(self): self.last_epoch += 1 for param_group in self.optimizer.param_groups: lr = param_group['lr'] # 初始lr(注意:PyTorch使用base_lrs) new_lr = self.eta_min + 0.5 * (lr - self.eta_min) * (1 + math.cos( math.pi * min(self.last_epoch, self.T_max) / self.T_max )) param_group['lr'] = new_lr ``` ### 6.3 Warmup + Cosine ```python class WarmupCosineLR: """Warmup + 余弦退火""" def __init__(self, optimizer, warmup_epochs, total_epochs, eta_min=0): self.optimizer = optimizer self.warmup_epochs = warmup_epochs self.total_epochs = total_epochs self.eta_min = eta_min self.last_epoch = -1 def step(self): self.last_epoch += 1 for param_group in self.optimizer.param_groups: base_lr = param_group.get('base_lr', param_group['lr']) if self.last_epoch < self.warmup_epochs: # Warmup:线性增加 lr = base_lr * (self.last_epoch + 1) / self.warmup_epochs else: # Cosine annealing progress = (self.last_epoch - self.warmup_epochs) / ( self.total_epochs - self.warmup_epochs ) lr = self.eta_min + 0.5 * (base_lr - self.eta_min) * (1 + math.cos(math.pi * progress)) param_group['lr'] = lr ``` ### 6.4 PyTorch原生调度 ```python # 阶梯衰减 scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=30, gamma=0.1) # 余弦退火 scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=100) # Warmup + Cosine scheduler = torch.optim.lr_scheduler.SequentialLR( optimizer, schedulers=[ torch.optim.lr_scheduler.LinearLR(optimizer, start_factor=0.1, total_iters=5), torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=95) ], milestones=[5] ) ``` --- ## 七、完整训练流水线 ### 7.1 训练器类 ```python import torch import torch.nn as nn import torch.nn.functional as F from torch.utils.data import DataLoader import time class MLPTrainer: """MLP训练器""" def __init__(self, model, optimizer, scheduler=None, device='cuda'): self.model = model.to(device) self.optimizer = optimizer self.scheduler = scheduler self.device = device # 训练历史 self.history = { 'train_loss': [], 'train_acc': [], 'val_loss': [], 'val_acc': [], 'lr': [] } def train_epoch(self, train_loader, criterion): """训练一个epoch""" self.model.train() total_loss = 0 correct = 0 total = 0 for batch_x, batch_y in train_loader: batch_x = batch_x.to(self.device) batch_y = batch_y.to(self.device) self.optimizer.zero_grad() outputs = self.model(batch_x) loss = criterion(outputs, batch_y) loss.backward() # 梯度裁剪 torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0) self.optimizer.step() total_loss += loss.item() * batch_x.size(0) _, predicted = outputs.max(1) total += batch_y.size(0) correct += predicted.eq(batch_y).sum().item() return total_loss / total, correct / total def evaluate(self, val_loader, criterion): """评估""" self.model.eval() total_loss = 0 correct = 0 total = 0 with torch.no_grad(): for batch_x, batch_y in val_loader: batch_x = batch_x.to(self.device) batch_y = batch_y.to(self.device) outputs = self.model(batch_x) loss = criterion(outputs, batch_y) total_loss += loss.item() * batch_x.size(0) _, predicted = outputs.max(1) total += batch_y.size(0) correct += predicted.eq(batch_y).sum().item() return total_loss / total, correct / total def fit(self, train_loader, val_loader, epochs, criterion): """完整训练流程""" best_val_acc = 0 best_state = None for epoch in range(epochs): start = time.time() # 训练 train_loss, train_acc = self.train_epoch(train_loader, criterion) # 验证 if val_loader is not None: val_loss, val_acc = self.evaluate(val_loader, criterion) self.history['val_loss'].append(val_loss) self.history['val_acc'].append(val_acc) if val_acc > best_val_acc: best_val_acc = val_acc best_state = {k: v.cpu().clone() for k, v in self.model.state_dict().items()} else: val_loss, val_acc = 0, 0 # 学习率调度 if self.scheduler is not None: if isinstance(self.scheduler, torch.optim.lr_scheduler.ReduceLROnPlateau): self.scheduler.step(val_loss) else: self.scheduler.step() # 记录 self.history['train_loss'].append(train_loss) self.history['train_acc'].append(train_acc) current_lr = self.optimizer.param_groups[0]['lr'] self.history['lr'].append(current_lr) elapsed = time.time() - start print(f"Epoch {epoch+1}/{epochs} ({elapsed:.1f}s) - " f"train_loss: {train_loss:.4f}, train_acc: {train_acc:.4f} - " f"val_loss: {val_loss:.4f}, val_acc: {val_acc:.4f} - " f"lr: {current_lr:.2e}") # 恢复最佳模型 if best_state is not None: self.model.load_state_dict(best_state) print(f"\n最佳验证准确率: {best_val_acc:.4f}") return self.history ``` ### 7.2 完整使用示例 ```python def train_mlp_mnist(): """在MNIST上训练MLP的完整示例""" from torchvision import datasets, transforms # 数据 transform_train = transforms.Compose([ transforms.RandomCrop(28, padding=2), transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,)) ]) transform_test = transforms.Compose([ transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,)) ]) train_dataset = datasets.MNIST('./data', train=True, download=True, transform=transform_train) test_dataset = datasets.MNIST('./data', train=False, transform=transform_test) train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True, num_workers=2) test_loader = DataLoader(test_dataset, batch_size=256, shuffle=False, num_workers=2) # 模型 device = 'cuda' if torch.cuda.is_available() else 'cpu' model = MLP( input_dim=784, hidden_dims=[512, 256, 128], output_dim=10, activation='gelu', output_activation='log_softmax', use_batchnorm=True, dropout_rate=0.3, init_method='he', weight_decay=1e-4 ) # 优化器 optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3, weight_decay=1e-2) # 学习率调度 scheduler = torch.optim.lr_scheduler.SequentialLR( optimizer, schedulers=[ torch.optim.lr_scheduler.LinearLR(optimizer, start_factor=0.1, total_iters=3), torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=27) ], milestones=[3] ) # 损失函数 criterion = nn.CrossEntropyLoss() # 训练 trainer = MLPTrainer(model, optimizer, scheduler, device=device) history = trainer.fit( train_loader, test_loader, epochs=30, criterion=criterion ) return model, history # 运行 if __name__ == '__main__': model, history = train_mlp_mnist() ``` ### 7.3 监控与可视化 ```python import matplotlib.pyplot as plt def plot_training_history(history): """绘制训练曲线""" fig, axes = plt.subplots(1, 3, figsize=(15, 4)) # 损失 axes[0].plot(history['train_loss'], label='Train Loss') axes[0].plot(history['val_loss'], label='Val Loss') axes[0].set_xlabel('Epoch') axes[0].set_ylabel('Loss') axes[0].legend() axes[0].set_title('Loss Curve') # 准确率 axes[1].plot(history['train_acc'], label='Train Acc') axes[1].plot(history['val_acc'], label='Val Acc') axes[1].set_xlabel('Epoch') axes[1].set_ylabel('Accuracy') axes[1].legend() axes[1].set_title('Accuracy Curve') # 学习率 axes[2].plot(history['lr']) axes[2].set_xlabel('Epoch') axes[2].set_ylabel('Learning Rate') axes[2].set_yscale('log') axes[2].set_title('LR Schedule') plt.tight_layout() plt.savefig('training_history.png', dpi=100) plt.show() ``` --- ## 八、调试技巧 ### 8.1 常见问题排查 **问题1:Loss不下降** ```python # 检查1:学习率 print(f"当前lr: {optimizer.param_groups[0]['lr']}") # 尝试:减小10倍或增大10倍 # 检查2:梯度 for name, param in model.named_parameters(): if param.grad is not None: print(f"{name}: grad mean={param.grad.mean():.2e}, std={param.grad.std():.2e}") # 检查3:数据归一化 print(f"输入统计: mean={x.mean():.4f}, std={x.std():.4f}") ``` **问题2:过拟合** ```python # 添加正则化 model = MLP(..., dropout_rate=0.5, weight_decay=1e-3) # 数据增强 transform = transforms.Compose([ transforms.RandomHorizontalFlip(), transforms.RandomCrop(28, padding=4), # ... ]) # 早停 early_stop_patience = 10 ``` **问题3:梯度消失/爆炸** ```python # 监控激活值 def hook_fn(name): def fn(module, input, output): print(f"{name}: mean={output.mean():.4f}, std={output.std():.4f}") return fn for name, module in model.named_modules(): if isinstance(module, nn.Linear): module.register_forward_hook(hook_fn(name)) # 监控梯度 def grad_hook(name): def fn(grad): print(f"{name}: grad mean={grad.mean():.4e}, std={grad.std():.4e}") return fn for name, param in model.named_parameters(): if 'weight' in name: param.register_hook(grad_hook(name)) ``` ### 8.2 单元测试 ```python def test_mlp_forward_backward(): """测试MLP的前向和反向传播""" torch.manual_seed(42) model = MLP(10, [20, 15], 5, activation='relu', output_activation='log_softmax') # 前向 x = torch.randn(3, 10) y_pred = model(x) assert y_pred.shape == (3, 5), f"输出形状错误: {y_pred.shape}" # 反向 y_true = torch.tensor([0, 2, 4]) criterion = nn.NLLLoss() loss = criterion(y_pred, y_true) loss.backward() # 检查梯度 for name, param in model.named_parameters(): assert param.grad is not None, f"{name} 梯度为None" assert torch.isfinite(param.grad).all(), f"{name} 梯度非有限值" def test_dropout(): """测试Dropout行为""" model = nn.Sequential(nn.Linear(10, 5), nn.Dropout(0.5)) # 训练模式:应有随机性 model.train() x = torch.ones(100, 10) out1 = model(x) out2 = model(x) assert not torch.allclose(out1, out2), "训练时Dropout未生效" # 推理模式:应稳定 model.eval() out1 = model(x) out2 = model(x) assert torch.allclose(out1, out2), "推理时输出不一致" def test_batchnorm(): """测试BatchNorm行为""" bn = nn.BatchNorm1d(5) # 训练模式 bn.train() x = torch.randn(100, 5) out_train = bn(x) assert torch.allclose(out_train.mean(dim=0), torch.zeros(5), atol=1e-5), "均值不为0" # 推理模式 bn.eval() out_eval = bn(x) assert torch.allclose(out_eval, out_train, atol=1e-3), "训练/推理输出一致" ``` --- ## 九、最佳实践 ### 9.1 MLP设计原则 1. **宽度 vs 深度**:宽度增加计算量线性增长,深度增加指数增长。深度通常更有效。 2. **瓶颈设计**:避免维度急剧下降(如 1024 → 10),中间层应平滑过渡。 3. **残差连接**:对于很深的MLP,残差连接可缓解梯度消失。 ```python class ResidualBlock(nn.Module): """MLP残差块""" def __init__(self, dim, activation='gelu', dropout=0.1): super().__init__() self.norm = nn.LayerNorm(dim) self.fc1 = nn.Linear(dim, 4 * dim) self.fc2 = nn.Linear(4 * dim, dim) self.activation = nn.GELU() if activation == 'gelu' else nn.ReLU() self.dropout = nn.Dropout(dropout) def forward(self, x): residual = x x = self.norm(x) x = self.fc1(x) x = self.activation(x) x = self.dropout(x) x = self.fc2(x) x = self.dropout(x) return x + residual # 残差连接 ``` ### 9.2 训练策略 1. **学习率搜索**:从 1e-4 到 1e-1 指数扫描 2. **学习率warmup**:前几个epoch从0线性增加 3. **早停**:监控验证损失,10-20 epoch无改善则停止 4. **模型集成**:训练多个模型并平均预测 ### 9.3 部署优化 1. **量化**:FP32 → INT8 (8倍压缩) 2. **剪枝**:去除小权重连接 3. **蒸馏**:训练小模型模仿大模型 4. **ONNX导出**:跨平台部署 ```python # ONNX导出 dummy_input = torch.randn(1, 784) torch.onnx.export( model, dummy_input, "mlp.onnx", input_names=['input'], output_names=['output'], dynamic_axes={'input': {0: 'batch'}, 'output': {0: 'batch'}} ) ``` --- ## 十、参考资料 [^1]: Goodfellow, I., Bengio, Y., & Courville, A. (2016). *Deep Learning*. MIT Press. Chapter 6: Feedforward Deep Networks. --- *最后更新:2026-06-21*