概述
N-BEATS(Neural Basis Expansion Analysis Time-Series)是由 Oreshkin 等人于2019年提出的深度学习时间序列预测模型,它是首个在M3/M4预测竞赛中超越统计方法的纯深度学习模型。1 N-HiTS(Neural Hierarchical Interpolation for Time-Series)则是 N-BEATS 的改进版本,通过多速率采样和层次插值解决了长预测范围精度下降的问题。2
这两个模型的核心思想是利用基础展开分析(Basis Expansion)的数学框架,将复杂的时间序列分解为可解释的成分。
一、N-BEATS:基础展开分析
1.1 核心思想
N-BEATS 的名字来源于统计学中的”基础展开”(Basis Expansion)方法。在统计学中,任何函数都可以表示为一组基函数的线性组合:
其中 是基函数, 是系数。
N-BEATS 的创新:用神经网络学习这些基函数和系数,而不是手工设计。
1.2 栈结构(Block Stack)
N-BEATS 由多个栈(Stack)组成,每个栈包含多个块(Block)。每个块的结构如下:
输入 (回看窗口)
↓
全连接层
↓
多层感知机 (共享θ)
↓
基础层 (Backcast + Forecast 输出)
↓
输出 (Backcast预测 + Forecast预测)
数学表述:
对于第 个块,输入为 ,输出为:
其中:
- :回看预测(用于堆叠传递)
- :前向预测(最终输出)
1.3 双重残差栈
N-BEATS 使用双重残差结构来组织多个块:
第一个栈:趋势栈(Trend Stack)
class TrendBlock(nn.Module):
def __init__(self, n_layers=4, n_units=512, basis_dim=256):
super().__init__()
self.layers = nn.ModuleList([
nn.Linear(n_units, n_units) for _ in range(n_layers)
])
self.basis_function = nn.Linear(n_units, 2 * basis_dim) # θ的维度
def forward(self, x):
# x: [batch, lookback_window]
for layer in self.layers:
x = F.gelu(layer(x)) # GELU激活
# 生成傅里叶/多项式基系数
theta = self.basis_function(x) # [batch, 2*basis_dim]
# 生成回看和预测
backcast = self.trend_basis(x, theta[:, :basis_dim])
forecast = self.trend_basis(x, theta[:, basis_dim:])
return backcast, forecast
def trend_basis(self, x, theta):
"""
趋势基函数:多项式或傅里叶基
这里使用傅里叶基作为示例
"""
# 简化的实现
return torch.matmul(x, theta)第二个栈:季节栈(Seasonal Stack)
季节栈的结构与趋势栈类似,但使用不同的基函数:
1.4 栈间连接
每个块的回看预测用于更新下一个块的输入,形成栈内残差连接:
栈的最终输出是所有块的预测之和:
1.5 损失函数
N-BEATS 的损失函数同时考虑回看拟合和预测精度:
其中 是回看损失权重(通常设为0)。
二、N-HiTS:层次插值改进
2.1 N-BEATS的局限性
N-BEATS 在短预测范围(few steps)上表现优秀,但随着预测步长增加,性能急剧下降。原因包括:
- 感受野固定:所有块使用相同的回看窗口
- 混合频率信息:长短期信号混在一起,难以分别处理
- 插值简单:直接求和,没有考虑不同频率的特性
2.2 核心创新:多速率采样
N-HiTS 的第一个关键创新是多速率采样(Multi-Rate Sampling)。
频率分离
在每个块内部,N-HiTS 使用下采样将输入分离为不同频率的成分:
class MultiRateSampler(nn.Module):
def __init__(self, in_len, downsample_factors=[2, 2]):
super().__init__()
self.pools = nn.ModuleList([
nn.MaxPool1d(kernel_size=f) for f in downsample_factors
])
def forward(self, x):
"""
x: [batch, in_len, features]
返回不同采样率的特征
"""
features = [x.transpose(1, 2)] # [batch, features, in_len]
for pool in self.pools:
downsampled = pool(features[-1])
features.append(downsampled)
return features[::-1] # 从粗到细频率特定块
每个频率成分由专门的块处理:
- 粗粒度块:处理低频(长期)信息
- 细粒度块:处理高频(短期)信息
2.3 核心创新:层次插值
N-HiTS 的第二个关键创新是层次插值(Hierarchical Interpolation)。
问题
传统方法在预测时,所有块输出直接求和。但不同频率的预测具有不同的分辨率:
- 低频块输出短序列
- 高频块输出长序列
插值机制
class HierarchicalInterpolator(nn.Module):
def __init__(self, forecast_len, interpolation_modes=['linear', 'nearest']):
super().__init__()
self.interp_modes = interpolation_modes
def interpolate(self, x, target_len, mode='linear'):
"""
将短序列插值到目标长度
"""
if mode == 'linear':
# 线性插值
x_expanded = F.interpolate(
x.unsqueeze(1), # [batch, 1, len]
size=target_len,
mode='linear',
align_corners=True
).squeeze(1)
elif mode == 'cubic':
# 三次插值
x_expanded = F.interpolate(
x.unsqueeze(1),
size=target_len,
mode='cubic',
align_corners=True
).squeeze(1)
else:
# 最近邻
x_expanded = F.interpolate(
x.unsqueeze(1),
size=target_len,
mode='nearest'
).squeeze(1)
return x_expanded2.4 N-HiTS 完整架构
class NHiTS(nn.Module):
def __init__(self, in_len, out_len, n_blocks=3, n_layers=2,
n_units=512, pooling_size=2):
super().__init__()
self.blocks = nn.ModuleList()
for i in range(n_blocks):
# 每个块有不同的下采样率
downsample_factor = pooling_size ** i
block_in_len = in_len // downsample_factor
block_out_len = out_len // downsample_factor
self.blocks.append(
NHiTSBlock(
in_len=block_in_len,
out_len=block_out_len,
n_layers=n_layers,
n_units=n_units
)
)
# 层次插值器
self.interpolators = nn.ModuleList([
HierarchicalInterpolator(out_len, out_len // (pooling_size ** i))
for i in range(n_blocks)
])
def forward(self, x):
"""
x: [batch, in_len, n_features]
"""
forecasts = []
for i, (block, interp) in enumerate(zip(self.blocks, self.interpolators)):
down_x = self.downsample(x, pooling_size ** i)
_, forecast = block(down_x)
# 插值到目标长度
forecast_upsampled = interp.interpolate(forecast, self.out_len,
mode='linear')
forecasts.append(forecast_upsampled)
# 加权融合
return torch.stack(forecasts, dim=-1).sum(dim=-1)
def downsample(self, x, factor):
"""下采样"""
return x[:, ::factor, :]三、对比分析
3.1 N-BEATS vs N-HiTS
| 特性 | N-BEATS | N-HiTS |
|---|---|---|
| 采样方式 | 固定速率 | 多速率 |
| 频率处理 | 混合 | 分离 |
| 插值机制 | 直接求和 | 层次插值 |
| 长期预测 | 精度下降 | 精度保持 |
| 参数量 | 较大 | 较小(参数共享) |
| 可解释性 | 趋势/季节栈分离 | 频率成分分离 |
3.2 数学对比
N-BEATS 的输出
其中 是第 个块的基础展开函数。
N-HiTS 的输出
其中 是第 个块的插值函数, 是下采样后的输入。
3.3 在M4数据集上的表现
| 模型 | SMAPE | MASE |
|---|---|---|
| N-BEATS | 11.7% | 0.83 |
| N-HiTS | 11.2% | 0.79 |
| Transformer | 13.1% | 0.91 |
| ARIMA | 12.5% | 0.88 |
四、PyTorch完整实现
4.1 N-HiTS 完整代码
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
class NHiTSBlock(nn.Module):
"""N-HiTS 块"""
def __init__(self, in_len, out_len, n_layers=2, n_units=512,
n_hidden_units=128, pooling_kernel_size=4):
super().__init__()
self.in_len = in_len
self.out_len = out_len
# 输入投影
self.input_layer = nn.Linear(in_len, n_units)
# 多层感知机
self.layers = nn.ModuleList([
nn.Linear(n_units, n_units) for _ in range(n_layers)
])
# 频率感知的下采样
self.pooling = nn.MaxPool1d(kernel_size=pooling_kernel_size,
stride=pooling_kernel_size)
# Backcast 和 Forecast 网络
hidden_len = in_len // pooling_kernel_size
self.backcast_fc = nn.Sequential(
nn.Linear(n_units, n_hidden_units),
nn.ReLU(),
nn.Linear(n_hidden_units, hidden_len)
)
forecast_len = out_len // pooling_kernel_size
self.forecast_fc = nn.Sequential(
nn.Linear(n_units, n_hidden_units),
nn.ReLU(),
nn.Linear(n_hidden_units, forecast_len)
)
def forward(self, x):
"""
x: [batch, in_len]
"""
# 输入投影
h = self.input_layer(x)
# 多层处理
for layer in self.layers:
h = F.gelu(layer(h) + h) # 残差连接
# 生成预测
backcast = self.backcast_fc(h)
forecast = self.forecast_fc(h)
return backcast, forecast
class NHiTSModel(nn.Module):
"""完整的 N-HiTS 模型"""
def __init__(self, in_len=7*24, out_len=24, n_blocks=3,
n_layers=2, n_units=512, n_hidden=128):
super().__init__()
self.in_len = in_len
self.out_len = out_len
# 构建多个块(从粗到细)
self.blocks = nn.ModuleList()
pooling_kernel_size = 2
for i in range(n_blocks):
block_in_len = in_len // (pooling_kernel_size ** i)
block_out_len = out_len // (pooling_kernel_size ** i)
self.blocks.append(
NHiTSBlock(
in_len=block_in_len,
out_len=block_out_len,
n_layers=n_layers,
n_units=n_units,
n_hidden_units=n_hidden,
pooling_kernel_size=pooling_kernel_size
)
)
def forward(self, x):
"""
x: [batch, in_len]
"""
# x 是回看窗口
total_forecast = torch.zeros(x.shape[0], self.out_len,
device=x.device)
for block in self.blocks:
backcast, forecast = block(x)
# 上采样回backcast长度并相减
if backcast.shape[1] < x.shape[1]:
backcast_up = F.interpolate(
backcast.unsqueeze(1), size=x.shape[1],
mode='linear', align_corners=True
).squeeze(1)
x = x - backcast_up
# 上采样forecast并累加
forecast_up = F.interpolate(
forecast.unsqueeze(1), size=self.out_len,
mode='linear', align_corners=True
).squeeze(1)
total_forecast = total_forecast + forecast_up
return total_forecast4.2 训练脚本
import numpy as np
import pandas as pd
import torch
from torch.utils.data import DataLoader, TensorDataset
def create_sequences(data, in_len, out_len):
"""创建训练序列"""
X, y = [], []
for i in range(len(data) - in_len - out_len + 1):
X.append(data[i:i+in_len])
y.append(data[i+in_len:i+in_len+out_len])
return np.array(X), np.array(y)
def train_nhits(model, train_data, val_data, epochs=100, batch_size=64, lr=1e-3):
"""训练 N-HiTS 模型"""
# 准备数据
X_train, y_train = create_sequences(train_data, model.in_len, model.out_len)
X_val, y_val = create_sequences(val_data, model.in_len, model.out_len)
train_loader = DataLoader(
TensorDataset(torch.FloatTensor(X_train), torch.FloatTensor(y_train)),
batch_size=batch_size, shuffle=True
)
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, patience=10, factor=0.5
)
best_val_loss = float('inf')
for epoch in range(epochs):
# 训练
model.train()
train_loss = 0
for X_batch, y_batch in train_loader:
optimizer.zero_grad()
y_pred = model(X_batch)
loss = F.mse_loss(y_pred, y_batch)
loss.backward()
optimizer.step()
train_loss += loss.item()
# 验证
model.eval()
with torch.no_grad():
y_val_pred = model(torch.FloatTensor(X_val))
val_loss = F.mse_loss(y_val_pred, torch.FloatTensor(y_val))
scheduler.step(val_loss)
if val_loss < best_val_loss:
best_val_loss = val_loss
torch.save(model.state_dict(), 'nhits_best.pt')
if epoch % 10 == 0:
print(f"Epoch {epoch}: Train Loss = {train_loss/len(train_loader):.4f}, "
f"Val Loss = {val_loss:.4f}")
return model五、实际应用示例
5.1 电力负荷预测
# 加载数据(假设是每小时的电力负荷)
data = pd.read_csv('electricity.csv')['load'].values
# 归一化
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
data_scaled = scaler.fit_transform(data.reshape(-1, 1)).flatten()
# 划分数据
train_size = int(len(data_scaled) * 0.8)
train_data = data_scaled[:train_size]
test_data = data_scaled[train_size:]
# 创建模型
model = NHiTSModel(
in_len=7*24, # 回看一周
out_len=24, # 预测一天
n_blocks=3,
n_layers=2,
n_units=512
)
# 训练
model = train_nhits(model, train_data, test_data, epochs=100)
# 预测
model.load_state_dict(torch.load('nhits_best.pt'))
model.eval()
with torch.no_grad():
# 预测未来24小时
last_window = torch.FloatTensor(train_data[-7*24:]).unsqueeze(0)
forecast = model(last_window)
# 反归一化
forecast_unscaled = scaler.inverse_transform(
forecast.numpy().reshape(-1, 1)
).flatten()5.2 评估指标
def evaluate_forecast(y_true, y_pred, scaler=None):
"""评估预测性能"""
if scaler:
y_true = scaler.inverse_transform(y_true.reshape(-1, 1)).flatten()
y_pred = scaler.inverse_transform(y_pred.reshape(-1, 1)).flatten()
# MSE / RMSE
mse = np.mean((y_true - y_pred) ** 2)
rmse = np.sqrt(mse)
# MAE
mae = np.mean(np.abs(y_true - y_pred))
# MAPE
mape = np.mean(np.abs((y_true - y_pred) / (y_true + 1e-8))) * 100
# SMAPE
smape = 200 * np.mean(np.abs(y_pred - y_true) /
(np.abs(y_true) + np.abs(y_pred) + 1e-8))
print(f"RMSE: {rmse:.4f}")
print(f"MAE: {mae:.4f}")
print(f"MAPE: {mape:.2f}%")
print(f"SMAPE: {smape:.2f}%")
return {'rmse': rmse, 'mae': mae, 'mape': mape, 'smape': smape}六、参考
相关阅读
Footnotes
-
Oreshkin, B. N., Carpov, D., Chapados, N., & Bengio, Y. (2019). N-BEATS: Neural basis expansion analysis for interpretable time series forecasting. arXiv:1905.10437. ↩
-
Challu, C., Olivares, K. G., Oreshkin, B. N., Garza, F. G., Canseco, M. M., & Dubrawski, A. (2023). NHITS: Neural hierarchical interpolation for time series forecasting. Pattern Recognition, 123, 108372. ↩