概述
多层感知机(MLP)是深度学习最基础也最重要的架构。本文档提供一份从零开始的完整实现指南,涵盖:
- NumPy从零实现:理解每个数学细节
- PyTorch完整实现:工业级训练流程
- 初始化方案:Xavier、He、LSUV、正交初始化
- 正则化技术:Dropout、BatchNorm、Weight Decay、LayerNorm
- 优化器:SGD、Adam、AdamW
- 学习率调度:Step、Cosine、Warmup
- 完整训练流水线:MNIST/Fashion-MNIST分类
MLP是所有深度学习架构的”原子”,深入理解其实现细节对理解CNN、RNN、Transformer至关重要。[^1]
一、MLP的数学基础
1.1 网络结构
层MLP的数学形式:
其中:
- 是输入
- 是第 层权重
- 是偏置
- 是第 层激活函数
- 是输出
- 是预测概率
1.2 损失函数
交叉熵损失(多分类):
均方误差(回归):
1.3 反向传播
使用链式法则,损失对参数的梯度:
其中 是误差信号。
递推关系:
二、NumPy从零实现
2.1 完整实现
import numpy as np
class MLP:
"""NumPy从零实现的多层感知机"""
def __init__(self, layer_dims, activations, init_method='he',
dropout_rate=0.0, weight_decay=0.0):
"""
参数:
layer_dims: 每层维度,例如 [784, 256, 128, 10]
activations: 激活函数列表,例如 ['relu', 'relu', 'softmax']
init_method: 初始化方法
dropout_rate: dropout概率
weight_decay: L2正则化系数
"""
self.num_layers = len(layer_dims) - 1
self.layer_dims = layer_dims
self.activations = activations
self.dropout_rate = dropout_rate
self.weight_decay = weight_decay
# 初始化参数
self.parameters = {}
self._initialize_parameters(init_method)
# 训练模式标志
self.training = True
def _initialize_parameters(self, method):
"""权重初始化"""
for l in range(1, self.num_layers + 1):
fan_in = self.layer_dims[l - 1]
fan_out = self.layer_dims[l]
if method == 'xavier':
# Xavier/Glorot: Var(W) = 2 / (fan_in + fan_out)
scale = np.sqrt(2.0 / (fan_in + fan_out))
elif method == 'he':
# He/Kaiming: Var(W) = 2 / fan_in (for ReLU)
scale = np.sqrt(2.0 / fan_in)
elif method == 'xavier_normal':
scale = np.sqrt(1.0 / fan_in)
else:
scale = 0.01
# 权重矩阵
self.parameters[f'W{l}'] = np.random.randn(fan_in, fan_out) * scale
self.parameters[f'b{l}'] = np.zeros((1, fan_out))
def _activate(self, z, activation):
"""激活函数"""
if activation == 'relu':
return np.maximum(0, z)
elif activation == 'sigmoid':
return 1.0 / (1.0 + np.exp(-np.clip(z, -500, 500)))
elif activation == 'tanh':
return np.tanh(z)
elif activation == 'softmax':
z_shifted = z - np.max(z, axis=1, keepdims=True)
exp_z = np.exp(z_shifted)
return exp_z / np.sum(exp_z, axis=1, keepdims=True)
elif activation == 'leaky_relu':
return np.where(z > 0, z, 0.01 * z)
elif activation == 'identity':
return z
else:
raise ValueError(f"未知激活函数: {activation}")
def _activate_derivative(self, z, activation):
"""激活函数导数"""
if activation == 'relu':
return (z > 0).astype(float)
elif activation == 'sigmoid':
s = self._activate(z, 'sigmoid')
return s * (1 - s)
elif activation == 'tanh':
return 1 - np.tanh(z) ** 2
elif activation == 'leaky_relu':
return np.where(z > 0, 1.0, 0.01)
elif activation in ('softmax', 'identity'):
# softmax的导数与交叉熵组合计算
return np.ones_like(z)
else:
raise ValueError(f"未知激活函数: {activation}")
def forward(self, X):
"""前向传播"""
self.cache = {'A0': X}
self.dropout_masks = {}
A = X
for l in range(1, self.num_layers + 1):
W = self.parameters[f'W{l}']
b = self.parameters[f'b{l}']
activation = self.activations[l - 1]
# 线性变换
Z = A @ W + b
self.cache[f'Z{l}'] = Z
# 激活
A = self._activate(Z, activation)
# Dropout(除输出层外)
if self.training and self.dropout_rate > 0 and l < self.num_layers:
mask = (np.random.rand(*A.shape) > self.dropout_rate).astype(float)
A = A * mask / (1 - self.dropout_rate)
self.dropout_masks[f'D{l}'] = mask
self.cache[f'A{l}'] = A
return A
def compute_loss(self, Y_pred, Y_true):
"""计算损失"""
m = Y_true.shape[0]
if self.activations[-1] == 'softmax':
# 交叉熵损失
epsilon = 1e-15
Y_pred_clipped = np.clip(Y_pred, epsilon, 1 - epsilon)
loss = -np.mean(np.sum(Y_true * np.log(Y_pred_clipped), axis=1))
else:
# 均方误差
loss = np.mean((Y_pred - Y_true) ** 2)
# L2正则化
if self.weight_decay > 0:
reg_loss = 0
for l in range(1, self.num_layers + 1):
reg_loss += np.sum(self.parameters[f'W{l}'] ** 2)
loss += 0.5 * self.weight_decay * reg_loss / m
return loss
def backward(self, Y_pred, Y_true):
"""反向传播"""
m = Y_true.shape[0]
gradients = {}
# 输出层误差
if self.activations[-1] == 'softmax' and Y_true.shape[1] > 1:
# 交叉熵 + softmax的组合梯度
dA = (Y_pred - Y_true) / m
else:
dA = 2 * (Y_pred - Y_true) / m
for l in reversed(range(1, self.num_layers + 1)):
W = self.parameters[f'W{l}']
Z = self.cache[f'Z{l}']
A_prev = self.cache[f'A{l-1}']
activation = self.activations[l - 1]
# Dropout mask
if f'D{l}' in self.dropout_masks:
dA = dA * self.dropout_masks[f'D{l}'] / (1 - self.dropout_rate)
# 激活函数梯度
if activation == 'softmax':
# softmax梯度已与交叉熵组合
dZ = dA
else:
dZ = dA * self._activate_derivative(Z, activation)
# 参数梯度
gradients[f'dW{l}'] = A_prev.T @ dZ + self.weight_decay * W / m
gradients[f'db{l}'] = np.sum(dZ, axis=0, keepdims=True)
# 传递到前一层
dA = dZ @ W.T
return gradients
def update_parameters(self, gradients, learning_rate):
"""梯度下降更新"""
for l in range(1, self.num_layers + 1):
self.parameters[f'W{l}'] -= learning_rate * gradients[f'dW{l}']
self.parameters[f'b{l}'] -= learning_rate * gradients[f'db{l}']
def train(self, X, Y, X_val=None, Y_val=None, epochs=100, batch_size=32,
learning_rate=0.001, verbose=True):
"""训练流程"""
history = {'train_loss': [], 'val_loss': [], 'val_acc': []}
n_samples = X.shape[0]
for epoch in range(epochs):
# Shuffle
indices = np.random.permutation(n_samples)
X_shuffled = X[indices]
Y_shuffled = Y[indices]
epoch_loss = 0
n_batches = 0
for i in range(0, n_samples, batch_size):
X_batch = X_shuffled[i:i+batch_size]
Y_batch = Y_shuffled[i:i+batch_size]
# 前向传播
Y_pred = self.forward(X_batch)
# 计算损失
loss = self.compute_loss(Y_pred, Y_batch)
epoch_loss += loss
n_batches += 1
# 反向传播
gradients = self.backward(Y_pred, Y_batch)
# 更新参数
self.update_parameters(gradients, learning_rate)
avg_loss = epoch_loss / n_batches
history['train_loss'].append(avg_loss)
# 验证
if X_val is not None:
self.training = False
Y_val_pred = self.forward(X_val)
val_loss = self.compute_loss(Y_val_pred, Y_val)
val_acc = np.mean(np.argmax(Y_val_pred, axis=1) == np.argmax(Y_val, axis=1))
history['val_loss'].append(val_loss)
history['val_acc'].append(val_acc)
self.training = True
if verbose and (epoch + 1) % 10 == 0:
print(f"Epoch {epoch+1}/{epochs} - "
f"train_loss: {avg_loss:.4f} - "
f"val_loss: {val_loss:.4f} - "
f"val_acc: {val_acc:.4f}")
elif verbose and (epoch + 1) % 10 == 0:
print(f"Epoch {epoch+1}/{epochs} - train_loss: {avg_loss:.4f}")
return history
def predict(self, X):
"""预测"""
self.training = False
Y_pred = self.forward(X)
return np.argmax(Y_pred, axis=1)2.2 MNIST训练测试
def load_mnist_simple():
"""简化的MNIST加载(需要torchvision)"""
from torchvision import datasets, transforms
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,)),
transforms.Lambda(lambda x: x.view(-1)) # 展平
])
train_data = datasets.MNIST('./data', train=True, download=True, transform=transform)
test_data = datasets.MNIST('./data', train=False, transform=transform)
X_train = np.stack([d[0].numpy() for d in train_data])
Y_train = np.eye(10)[np.array([d[1] for d in train_data])]
X_test = np.stack([d[0].numpy() for d in test_data])
Y_test = np.eye(10)[np.array([d[1] for d in test_data])]
return X_train, Y_train, X_test, Y_test
# 使用NumPy实现的MLP
X_train, Y_train, X_test, Y_test = load_mnist_simple()
mlp = MLP(
layer_dims=[784, 256, 128, 10],
activations=['relu', 'relu', 'softmax'],
init_method='he',
dropout_rate=0.2,
weight_decay=1e-4
)
history = mlp.train(
X_train[:50000], Y_train[:50000],
X_val=X_train[50000:], Y_val=Y_train[50000:],
epochs=30,
batch_size=64,
learning_rate=0.001
)
# 测试
test_acc = np.mean(mlp.predict(X_test) == np.argmax(Y_test, axis=1))
print(f"测试集准确率: {test_acc:.4f}")三、PyTorch完整实现
3.1 模块化MLP
import torch
import torch.nn as nn
import torch.nn.functional as F
class LinearBlock(nn.Module):
"""线性层 + 归一化 + 激活 + Dropout"""
def __init__(self, in_features, out_features, activation='relu',
use_batchnorm=True, dropout_rate=0.0, init_method='he'):
super().__init__()
self.linear = nn.Linear(in_features, out_features)
self.use_bn = use_batchnorm
if use_batchnorm:
self.bn = nn.BatchNorm1d(out_features)
# 激活函数
if activation == 'relu':
self.activation = nn.ReLU(inplace=True)
elif activation == 'leaky_relu':
self.activation = nn.LeakyReLU(0.01, inplace=True)
elif activation == 'gelu':
self.activation = nn.GELU()
elif activation == 'silu':
self.activation = nn.SiLU()
elif activation == 'tanh':
self.activation = nn.Tanh()
elif activation == 'identity':
self.activation = nn.Identity()
else:
raise ValueError(f"未知激活: {activation}")
# Dropout
self.dropout = nn.Dropout(dropout_rate) if dropout_rate > 0 else nn.Identity()
# 初始化
self._init_weights(init_method, activation)
def _init_weights(self, method, activation):
"""权重初始化"""
if method == 'he':
# He/Kaiming初始化
if activation in ('relu', 'leaky_relu'):
nn.init.kaiming_normal_(self.linear.weight, nonlinearity='relu')
elif activation == 'gelu' or activation == 'silu':
# GELU/SiLU类似ReLU,使用相同的初始化
nn.init.kaiming_normal_(self.linear.weight, nonlinearity='relu')
else:
# Xavier初始化
nn.init.xavier_normal_(self.linear.weight)
elif method == 'xavier':
nn.init.xavier_normal_(self.linear.weight)
elif method == 'orthogonal':
nn.init.orthogonal_(self.linear.weight, gain=1.0)
elif method == 'lsuv':
# LSUV需要在前向传播时执行
self._lsuv = True
# 偏置初始化为0
if self.linear.bias is not None:
nn.init.zeros_(self.linear.bias)
def forward(self, x):
out = self.linear(x)
if self.use_bn:
out = self.bn(out)
out = self.activation(out)
out = self.dropout(out)
return out
class MLP(nn.Module):
"""完整的多层感知机"""
def __init__(self, input_dim, hidden_dims, output_dim,
activation='relu', output_activation='identity',
use_batchnorm=True, dropout_rate=0.0,
init_method='he', weight_decay=0.0):
super().__init__()
self.input_dim = input_dim
self.output_dim = output_dim
self.weight_decay = weight_decay
# 构造层
layers = []
prev_dim = input_dim
for hidden_dim in hidden_dims:
layers.append(LinearBlock(
in_features=prev_dim,
out_features=hidden_dim,
activation=activation,
use_batchnorm=use_batchnorm,
dropout_rate=dropout_rate,
init_method=init_method
))
prev_dim = hidden_dim
# 输出层
output_layer = nn.Linear(prev_dim, output_dim)
if init_method == 'he':
nn.init.kaiming_normal_(output_layer.weight)
else:
nn.init.xavier_normal_(output_layer.weight)
nn.init.zeros_(output_layer.bias)
layers.append(output_layer)
# 输出激活
if output_activation == 'softmax':
layers.append(nn.Softmax(dim=-1))
elif output_activation == 'log_softmax':
layers.append(nn.LogSoftmax(dim=-1))
elif output_activation == 'sigmoid':
layers.append(nn.Sigmoid())
self.network = nn.Sequential(*layers)
def forward(self, x):
# 展平输入
if x.dim() > 2:
x = x.view(x.size(0), -1)
return self.network(x)
# 示例
model = MLP(
input_dim=784,
hidden_dims=[512, 256, 128],
output_dim=10,
activation='relu',
output_activation='log_softmax',
use_batchnorm=True,
dropout_rate=0.3,
init_method='he'
)
print(model)3.2 LSUV初始化
Layer-Sequential Unit-Variance (LSUV) 是Mishkin & Matas (2015)提出的实用初始化方法:
def lsuv_init(model, data_batch, target_std=1.0, target_mean=0.0, max_iter=10, tol=1e-3):
"""
LSUV初始化:逐层调整权重使输出方差为1
"""
model.eval()
# 注册钩子获取每层输出
outputs = {}
handles = []
def hook(name):
def fn(module, input, output):
outputs[name] = output.detach()
return fn
for name, module in model.named_modules():
if isinstance(module, (nn.Conv2d, nn.Linear)):
handles.append(module.register_forward_hook(hook(name)))
# 前向传播
with torch.no_grad():
_ = model(data_batch)
# 调整每层
for name, module in model.named_modules():
if isinstance(module, (nn.Conv2d, nn.Linear)) and name in outputs:
out = outputs[name]
# 去除BatchNorm等
if out.dim() > 2:
out = out.view(out.size(0), -1)
for it in range(max_iter):
current_std = out.std().item()
current_mean = out.mean().item()
# 调整权重缩放
if abs(current_std - target_std) > tol:
module.weight.data *= (target_std / (current_std + 1e-8))
# 调整偏置
if abs(current_mean - target_mean) > tol:
if module.bias is not None:
module.bias.data += (target_mean - current_mean)
# 重新前向
_ = model(data_batch)
out = outputs[name]
if out.dim() > 2:
out = out.view(out.size(0), -1)
if (abs(current_std - target_std) < tol and
abs(current_mean - target_mean) < tol):
break
# 清理钩子
for handle in handles:
handle.remove()
model.train()四、正则化技术详解
4.1 Dropout
原理:训练时随机将一部分神经元输出置零,推理时使用全部神经元(并缩放补偿)。
数学形式:
训练时:
推理时:
\tilde{\mathbf{a}}^{(l)} = \mathbf{a}^{(l)} ``` **Inverted Dropout**(PyTorch使用):训练时除以 $(1-p)$,推理时无需缩放。 ```python class DropoutAnalysis: """Dropout分析""" @staticmethod def expected_value_test(): """验证Dropout的无偏性""" p = 0.5 x = torch.ones(10000, 100) mask = (torch.rand_like(x) > p).float() / (1 - p) output = (x * mask).mean() # 期望接近1 print(f"Dropout(p={p})输出均值: {output.item():.4f} (期望1)") @staticmethod def variance_test(): """Dropout对梯度方差的影响""" # 训练时梯度方差较大,提供正则化 # 推理时无方差,全局稳定 pass # 测试 DropoutAnalysis.expected_value_test() ``` ### 4.2 Batch Normalization **原理**:对每个mini-batch的特征做归一化,使其均值为0、方差为1,然后学习缩放和平移。 **训练时**:\hat{x}_i = \frac{x_i - \mu_B}{\sqrt{\sigma_B^2 + \epsilon}}, \quad y_i = \gamma \hat{x}_i + \beta
**推理时**:使用移动平均的 $\mu, \sigma$:\hat{x} = \frac{x - \mu_{\text{moving}}}{\sqrt{\sigma_{\text{moving}}^2 + \epsilon}}
**BatchNorm的优势**: 1. 允许更大学习率 2. 减少对初始化的依赖 3. 提供正则化效果 ```python class BatchNorm1dManual(nn.Module): """手动实现BatchNorm以理解其细节""" def __init__(self, num_features, eps=1e-5, momentum=0.1): super().__init__() self.eps = eps self.momentum = momentum self.gamma = nn.Parameter(torch.ones(num_features)) self.beta = nn.Parameter(torch.zeros(num_features)) self.register_buffer('running_mean', torch.zeros(num_features)) self.register_buffer('running_var', torch.ones(num_features)) def forward(self, x): if self.training: # 计算batch统计量 mean = x.mean(dim=0) var = x.var(dim=0, unbiased=False) # 更新移动平均 with torch.no_grad(): self.running_mean = (1 - self.momentum) * self.running_mean + self.momentum * mean self.running_var = (1 - self.momentum) * self.running_var + self.momentum * var else: mean = self.running_mean var = self.running_var # 归一化 x_hat = (x - mean) / torch.sqrt(var + self.eps) return self.gamma * x_hat + self.beta ``` ### 4.3 Layer Normalization **与BatchNorm的区别**:在**特征维度**而非batch维度归一化。\hat{x}_i = \frac{x_i - \mu_L}{\sqrt{\sigma_L^2 + \epsilon}}, \quad y_i = \gamma \hat{x}_i + \beta
其中 $\mu_L = \frac{1}{d} \sum_{j=1}^{d} x_{ij}$。 **优势**: - 对batch size不敏感 - 适用于变长序列(RNN/Transformer) - 训练和推理行为一致 ```python class LayerNormManual(nn.Module): """手动实现LayerNorm""" def __init__(self, normalized_shape, eps=1e-5): super().__init__() self.eps = eps self.gamma = nn.Parameter(torch.ones(normalized_shape)) self.beta = nn.Parameter(torch.zeros(normalized_shape)) def forward(self, x): mean = x.mean(dim=-1, keepdim=True) var = x.var(dim=-1, keepdim=True, unbiased=False) x_hat = (x - mean) / torch.sqrt(var + self.eps) return self.gamma * x_hat + self.beta ``` ### 4.4 Weight Decay **L2正则化**:\mathcal{L}{\text{reg}} = \mathcal{L}{\text{data}} + \frac{\lambda}{2} |\mathbf{W}|
\frac{\partial \mathcal{L}{\text{reg}}}{\partial W} = \frac{\partial \mathcal{L}{\text{data}}}{\partial W} + \lambda W
W \leftarrow W - \eta \left( \nabla_W \mathcal{L} + \lambda W \right)
不同于Adam中 $W \leftarrow W - \eta \nabla_W \mathcal{L} - \eta \lambda W$(耦合)。 ### 4.5 数据增强 ```python from torchvision import transforms def get_train_transforms(): """训练数据增强""" return transforms.Compose([ transforms.RandomHorizontalFlip(), transforms.RandomCrop(28, padding=4), transforms.ColorJitter(brightness=0.2, contrast=0.2), transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,)), ]) def get_test_transforms(): """测试数据变换(无增强)""" return transforms.Compose([ transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,)), ]) ``` --- ## 五、优化器 ### 5.1 SGD with Momentum ```python class SGD: """SGD with Momentum""" def __init__(self, params, lr=0.01, momentum=0.9, weight_decay=0.0, nesterov=False): self.params = list(params) self.lr = lr self.momentum = momentum self.weight_decay = weight_decay self.nesterov = nesterov # 每个参数的动量缓冲 self.velocities = [torch.zeros_like(p) for p in self.params] def step(self): """参数更新""" for i, param in enumerate(self.params): if param.grad is None: continue grad = param.grad # Weight decay if self.weight_decay != 0: grad = grad + self.weight_decay * param # 动量更新 self.velocities[i] = self.momentum * self.velocities[i] + grad if self.nesterov: # Nesterov动量 update = self.momentum * self.velocities[i] + grad else: update = self.velocities[i] param.data -= self.lr * update def zero_grad(self): for param in self.params: if param.grad is not None: param.grad.zero_() # 等价PyTorch实现 optimizer = torch.optim.SGD( model.parameters(), lr=0.01, momentum=0.9, weight_decay=1e-4, nesterov=True ) ``` ### 5.2 Adam优化器 ```python class Adam: """Adam优化器""" def __init__(self, params, lr=1e-3, betas=(0.9, 0.999), eps=1e-8, weight_decay=0.0): self.params = list(params) self.lr = lr self.beta1, self.beta2 = betas self.eps = eps self.weight_decay = weight_decay # 状态 self.m = [torch.zeros_like(p) for p in self.params] # 一阶矩 self.v = [torch.zeros_like(p) for p in self.params] # 二阶矩 self.t = 0 # 时间步 def step(self): self.t += 1 for i, param in enumerate(self.params): if param.grad is None: continue grad = param.grad # Weight decay (L2) if self.weight_decay != 0: grad = grad + self.weight_decay * param # 更新一阶矩 self.m[i] = self.beta1 * self.m[i] + (1 - self.beta1) * grad # 更新二阶矩 self.v[i] = self.beta2 * self.v[i] + (1 - self.beta2) * grad ** 2 # 偏差修正 m_hat = self.m[i] / (1 - self.beta1 ** self.t) v_hat = self.v[i] / (1 - self.beta2 ** self.t) # 参数更新 param.data -= self.lr * m_hat / (torch.sqrt(v_hat) + self.eps) def zero_grad(self): for param in self.params: if param.grad is not None: param.grad.zero_() # 等价PyTorch实现 optimizer = torch.optim.Adam( model.parameters(), lr=1e-3, betas=(0.9, 0.999), weight_decay=1e-4 ) ``` ### 5.3 AdamW优化器(解耦Weight Decay) ```python class AdamW: """AdamW优化器(解耦Weight Decay)""" def __init__(self, params, lr=1e-3, betas=(0.9, 0.999), eps=1e-8, weight_decay=1e-2): self.params = list(params) self.lr = lr self.beta1, self.beta2 = betas self.eps = eps self.weight_decay = weight_decay self.m = [torch.zeros_like(p) for p in self.params] self.v = [torch.zeros_like(p) for p in self.params] self.t = 0 def step(self): self.t += 1 for i, param in enumerate(self.params): if param.grad is None: continue grad = param.grad # Adam更新(不含weight decay) self.m[i] = self.beta1 * self.m[i] + (1 - self.beta1) * grad self.v[i] = self.beta2 * self.v[i] + (1 - self.beta2) * grad ** 2 m_hat = self.m[i] / (1 - self.beta1 ** self.t) v_hat = self.v[i] / (1 - self.beta2 ** self.t) # 参数更新:Adam + 解耦Weight Decay param.data -= self.lr * (m_hat / (torch.sqrt(v_hat) + self.eps) + self.weight_decay * param) def zero_grad(self): for param in self.params: if param.grad is not None: param.grad.zero_() # 等价PyTorch实现 optimizer = torch.optim.AdamW( model.parameters(), lr=1e-3, weight_decay=0.01 ) ``` --- ## 六、学习率调度 ### 6.1 Step Decay ```python class StepLR: """阶梯式衰减""" def __init__(self, optimizer, step_size=30, gamma=0.1): self.optimizer = optimizer self.step_size = step_size self.gamma = gamma self.last_epoch = -1 def step(self): self.last_epoch += 1 if self.last_epoch > 0 and self.last_epoch % self.step_size == 0: for param_group in self.optimizer.param_groups: param_group['lr'] *= self.gamma ``` ### 6.2 Cosine Annealing ```python import math class CosineAnnealingLR: """余弦退火""" def __init__(self, optimizer, T_max, eta_min=0): self.optimizer = optimizer self.T_max = T_max self.eta_min = eta_min self.last_epoch = -1 def step(self): self.last_epoch += 1 for param_group in self.optimizer.param_groups: lr = param_group['lr'] # 初始lr(注意:PyTorch使用base_lrs) new_lr = self.eta_min + 0.5 * (lr - self.eta_min) * (1 + math.cos( math.pi * min(self.last_epoch, self.T_max) / self.T_max )) param_group['lr'] = new_lr ``` ### 6.3 Warmup + Cosine ```python class WarmupCosineLR: """Warmup + 余弦退火""" def __init__(self, optimizer, warmup_epochs, total_epochs, eta_min=0): self.optimizer = optimizer self.warmup_epochs = warmup_epochs self.total_epochs = total_epochs self.eta_min = eta_min self.last_epoch = -1 def step(self): self.last_epoch += 1 for param_group in self.optimizer.param_groups: base_lr = param_group.get('base_lr', param_group['lr']) if self.last_epoch < self.warmup_epochs: # Warmup:线性增加 lr = base_lr * (self.last_epoch + 1) / self.warmup_epochs else: # Cosine annealing progress = (self.last_epoch - self.warmup_epochs) / ( self.total_epochs - self.warmup_epochs ) lr = self.eta_min + 0.5 * (base_lr - self.eta_min) * (1 + math.cos(math.pi * progress)) param_group['lr'] = lr ``` ### 6.4 PyTorch原生调度 ```python # 阶梯衰减 scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=30, gamma=0.1) # 余弦退火 scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=100) # Warmup + Cosine scheduler = torch.optim.lr_scheduler.SequentialLR( optimizer, schedulers=[ torch.optim.lr_scheduler.LinearLR(optimizer, start_factor=0.1, total_iters=5), torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=95) ], milestones=[5] ) ``` --- ## 七、完整训练流水线 ### 7.1 训练器类 ```python import torch import torch.nn as nn import torch.nn.functional as F from torch.utils.data import DataLoader import time class MLPTrainer: """MLP训练器""" def __init__(self, model, optimizer, scheduler=None, device='cuda'): self.model = model.to(device) self.optimizer = optimizer self.scheduler = scheduler self.device = device # 训练历史 self.history = { 'train_loss': [], 'train_acc': [], 'val_loss': [], 'val_acc': [], 'lr': [] } def train_epoch(self, train_loader, criterion): """训练一个epoch""" self.model.train() total_loss = 0 correct = 0 total = 0 for batch_x, batch_y in train_loader: batch_x = batch_x.to(self.device) batch_y = batch_y.to(self.device) self.optimizer.zero_grad() outputs = self.model(batch_x) loss = criterion(outputs, batch_y) loss.backward() # 梯度裁剪 torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0) self.optimizer.step() total_loss += loss.item() * batch_x.size(0) _, predicted = outputs.max(1) total += batch_y.size(0) correct += predicted.eq(batch_y).sum().item() return total_loss / total, correct / total def evaluate(self, val_loader, criterion): """评估""" self.model.eval() total_loss = 0 correct = 0 total = 0 with torch.no_grad(): for batch_x, batch_y in val_loader: batch_x = batch_x.to(self.device) batch_y = batch_y.to(self.device) outputs = self.model(batch_x) loss = criterion(outputs, batch_y) total_loss += loss.item() * batch_x.size(0) _, predicted = outputs.max(1) total += batch_y.size(0) correct += predicted.eq(batch_y).sum().item() return total_loss / total, correct / total def fit(self, train_loader, val_loader, epochs, criterion): """完整训练流程""" best_val_acc = 0 best_state = None for epoch in range(epochs): start = time.time() # 训练 train_loss, train_acc = self.train_epoch(train_loader, criterion) # 验证 if val_loader is not None: val_loss, val_acc = self.evaluate(val_loader, criterion) self.history['val_loss'].append(val_loss) self.history['val_acc'].append(val_acc) if val_acc > best_val_acc: best_val_acc = val_acc best_state = {k: v.cpu().clone() for k, v in self.model.state_dict().items()} else: val_loss, val_acc = 0, 0 # 学习率调度 if self.scheduler is not None: if isinstance(self.scheduler, torch.optim.lr_scheduler.ReduceLROnPlateau): self.scheduler.step(val_loss) else: self.scheduler.step() # 记录 self.history['train_loss'].append(train_loss) self.history['train_acc'].append(train_acc) current_lr = self.optimizer.param_groups[0]['lr'] self.history['lr'].append(current_lr) elapsed = time.time() - start print(f"Epoch {epoch+1}/{epochs} ({elapsed:.1f}s) - " f"train_loss: {train_loss:.4f}, train_acc: {train_acc:.4f} - " f"val_loss: {val_loss:.4f}, val_acc: {val_acc:.4f} - " f"lr: {current_lr:.2e}") # 恢复最佳模型 if best_state is not None: self.model.load_state_dict(best_state) print(f"\n最佳验证准确率: {best_val_acc:.4f}") return self.history ``` ### 7.2 完整使用示例 ```python def train_mlp_mnist(): """在MNIST上训练MLP的完整示例""" from torchvision import datasets, transforms # 数据 transform_train = transforms.Compose([ transforms.RandomCrop(28, padding=2), transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,)) ]) transform_test = transforms.Compose([ transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,)) ]) train_dataset = datasets.MNIST('./data', train=True, download=True, transform=transform_train) test_dataset = datasets.MNIST('./data', train=False, transform=transform_test) train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True, num_workers=2) test_loader = DataLoader(test_dataset, batch_size=256, shuffle=False, num_workers=2) # 模型 device = 'cuda' if torch.cuda.is_available() else 'cpu' model = MLP( input_dim=784, hidden_dims=[512, 256, 128], output_dim=10, activation='gelu', output_activation='log_softmax', use_batchnorm=True, dropout_rate=0.3, init_method='he', weight_decay=1e-4 ) # 优化器 optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3, weight_decay=1e-2) # 学习率调度 scheduler = torch.optim.lr_scheduler.SequentialLR( optimizer, schedulers=[ torch.optim.lr_scheduler.LinearLR(optimizer, start_factor=0.1, total_iters=3), torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=27) ], milestones=[3] ) # 损失函数 criterion = nn.CrossEntropyLoss() # 训练 trainer = MLPTrainer(model, optimizer, scheduler, device=device) history = trainer.fit( train_loader, test_loader, epochs=30, criterion=criterion ) return model, history # 运行 if __name__ == '__main__': model, history = train_mlp_mnist() ``` ### 7.3 监控与可视化 ```python import matplotlib.pyplot as plt def plot_training_history(history): """绘制训练曲线""" fig, axes = plt.subplots(1, 3, figsize=(15, 4)) # 损失 axes[0].plot(history['train_loss'], label='Train Loss') axes[0].plot(history['val_loss'], label='Val Loss') axes[0].set_xlabel('Epoch') axes[0].set_ylabel('Loss') axes[0].legend() axes[0].set_title('Loss Curve') # 准确率 axes[1].plot(history['train_acc'], label='Train Acc') axes[1].plot(history['val_acc'], label='Val Acc') axes[1].set_xlabel('Epoch') axes[1].set_ylabel('Accuracy') axes[1].legend() axes[1].set_title('Accuracy Curve') # 学习率 axes[2].plot(history['lr']) axes[2].set_xlabel('Epoch') axes[2].set_ylabel('Learning Rate') axes[2].set_yscale('log') axes[2].set_title('LR Schedule') plt.tight_layout() plt.savefig('training_history.png', dpi=100) plt.show() ``` --- ## 八、调试技巧 ### 8.1 常见问题排查 **问题1:Loss不下降** ```python # 检查1:学习率 print(f"当前lr: {optimizer.param_groups[0]['lr']}") # 尝试:减小10倍或增大10倍 # 检查2:梯度 for name, param in model.named_parameters(): if param.grad is not None: print(f"{name}: grad mean={param.grad.mean():.2e}, std={param.grad.std():.2e}") # 检查3:数据归一化 print(f"输入统计: mean={x.mean():.4f}, std={x.std():.4f}") ``` **问题2:过拟合** ```python # 添加正则化 model = MLP(..., dropout_rate=0.5, weight_decay=1e-3) # 数据增强 transform = transforms.Compose([ transforms.RandomHorizontalFlip(), transforms.RandomCrop(28, padding=4), # ... ]) # 早停 early_stop_patience = 10 ``` **问题3:梯度消失/爆炸** ```python # 监控激活值 def hook_fn(name): def fn(module, input, output): print(f"{name}: mean={output.mean():.4f}, std={output.std():.4f}") return fn for name, module in model.named_modules(): if isinstance(module, nn.Linear): module.register_forward_hook(hook_fn(name)) # 监控梯度 def grad_hook(name): def fn(grad): print(f"{name}: grad mean={grad.mean():.4e}, std={grad.std():.4e}") return fn for name, param in model.named_parameters(): if 'weight' in name: param.register_hook(grad_hook(name)) ``` ### 8.2 单元测试 ```python def test_mlp_forward_backward(): """测试MLP的前向和反向传播""" torch.manual_seed(42) model = MLP(10, [20, 15], 5, activation='relu', output_activation='log_softmax') # 前向 x = torch.randn(3, 10) y_pred = model(x) assert y_pred.shape == (3, 5), f"输出形状错误: {y_pred.shape}" # 反向 y_true = torch.tensor([0, 2, 4]) criterion = nn.NLLLoss() loss = criterion(y_pred, y_true) loss.backward() # 检查梯度 for name, param in model.named_parameters(): assert param.grad is not None, f"{name} 梯度为None" assert torch.isfinite(param.grad).all(), f"{name} 梯度非有限值" def test_dropout(): """测试Dropout行为""" model = nn.Sequential(nn.Linear(10, 5), nn.Dropout(0.5)) # 训练模式:应有随机性 model.train() x = torch.ones(100, 10) out1 = model(x) out2 = model(x) assert not torch.allclose(out1, out2), "训练时Dropout未生效" # 推理模式:应稳定 model.eval() out1 = model(x) out2 = model(x) assert torch.allclose(out1, out2), "推理时输出不一致" def test_batchnorm(): """测试BatchNorm行为""" bn = nn.BatchNorm1d(5) # 训练模式 bn.train() x = torch.randn(100, 5) out_train = bn(x) assert torch.allclose(out_train.mean(dim=0), torch.zeros(5), atol=1e-5), "均值不为0" # 推理模式 bn.eval() out_eval = bn(x) assert torch.allclose(out_eval, out_train, atol=1e-3), "训练/推理输出一致" ``` --- ## 九、最佳实践 ### 9.1 MLP设计原则 1. **宽度 vs 深度**:宽度增加计算量线性增长,深度增加指数增长。深度通常更有效。 2. **瓶颈设计**:避免维度急剧下降(如 1024 → 10),中间层应平滑过渡。 3. **残差连接**:对于很深的MLP,残差连接可缓解梯度消失。 ```python class ResidualBlock(nn.Module): """MLP残差块""" def __init__(self, dim, activation='gelu', dropout=0.1): super().__init__() self.norm = nn.LayerNorm(dim) self.fc1 = nn.Linear(dim, 4 * dim) self.fc2 = nn.Linear(4 * dim, dim) self.activation = nn.GELU() if activation == 'gelu' else nn.ReLU() self.dropout = nn.Dropout(dropout) def forward(self, x): residual = x x = self.norm(x) x = self.fc1(x) x = self.activation(x) x = self.dropout(x) x = self.fc2(x) x = self.dropout(x) return x + residual # 残差连接 ``` ### 9.2 训练策略 1. **学习率搜索**:从 1e-4 到 1e-1 指数扫描 2. **学习率warmup**:前几个epoch从0线性增加 3. **早停**:监控验证损失,10-20 epoch无改善则停止 4. **模型集成**:训练多个模型并平均预测 ### 9.3 部署优化 1. **量化**:FP32 → INT8 (8倍压缩) 2. **剪枝**:去除小权重连接 3. **蒸馏**:训练小模型模仿大模型 4. **ONNX导出**:跨平台部署 ```python # ONNX导出 dummy_input = torch.randn(1, 784) torch.onnx.export( model, dummy_input, "mlp.onnx", input_names=['input'], output_names=['output'], dynamic_axes={'input': {0: 'batch'}, 'output': {0: 'batch'}} ) ``` --- ## 十、参考资料 [^1]: Goodfellow, I., Bengio, Y., & Courville, A. (2016). *Deep Learning*. MIT Press. Chapter 6: Feedforward Deep Networks. --- *最后更新:2026-06-21*