概述

N-BEATS(Neural Basis Expansion Analysis Time-Series)是由 Oreshkin 等人于2019年提出的深度学习时间序列预测模型,它是首个在M3/M4预测竞赛中超越统计方法的纯深度学习模型。1 N-HiTS(Neural Hierarchical Interpolation for Time-Series)则是 N-BEATS 的改进版本,通过多速率采样层次插值解决了长预测范围精度下降的问题。2

这两个模型的核心思想是利用基础展开分析(Basis Expansion)的数学框架,将复杂的时间序列分解为可解释的成分。


一、N-BEATS:基础展开分析

1.1 核心思想

N-BEATS 的名字来源于统计学中的”基础展开”(Basis Expansion)方法。在统计学中,任何函数都可以表示为一组基函数的线性组合:

其中 是基函数, 是系数。

N-BEATS 的创新:用神经网络学习这些基函数和系数,而不是手工设计。

1.2 栈结构(Block Stack)

N-BEATS 由多个(Stack)组成,每个栈包含多个(Block)。每个块的结构如下:

输入 (回看窗口) 
    ↓
  全连接层
    ↓
  多层感知机 (共享θ)
    ↓
  基础层 (Backcast + Forecast 输出)
    ↓
  输出 (Backcast预测 + Forecast预测)

数学表述

对于第 个块,输入为 ,输出为:

其中:

  • :回看预测(用于堆叠传递)
  • :前向预测(最终输出)

1.3 双重残差栈

N-BEATS 使用双重残差结构来组织多个块:

第一个栈:趋势栈(Trend Stack)

class TrendBlock(nn.Module):
    def __init__(self, n_layers=4, n_units=512, basis_dim=256):
        super().__init__()
        self.layers = nn.ModuleList([
            nn.Linear(n_units, n_units) for _ in range(n_layers)
        ])
        self.basis_function = nn.Linear(n_units, 2 * basis_dim)  # θ的维度
        
    def forward(self, x):
        # x: [batch, lookback_window]
        for layer in self.layers:
            x = F.gelu(layer(x))  # GELU激活
        
        # 生成傅里叶/多项式基系数
        theta = self.basis_function(x)  # [batch, 2*basis_dim]
        
        # 生成回看和预测
        backcast = self.trend_basis(x, theta[:, :basis_dim])
        forecast = self.trend_basis(x, theta[:, basis_dim:])
        
        return backcast, forecast
    
    def trend_basis(self, x, theta):
        """
        趋势基函数:多项式或傅里叶基
        这里使用傅里叶基作为示例
        """
        # 简化的实现
        return torch.matmul(x, theta)

第二个栈:季节栈(Seasonal Stack)

季节栈的结构与趋势栈类似,但使用不同的基函数:

1.4 栈间连接

每个块的回看预测用于更新下一个块的输入,形成栈内残差连接:

栈的最终输出是所有块的预测之和:

1.5 损失函数

N-BEATS 的损失函数同时考虑回看拟合和预测精度:

其中 是回看损失权重(通常设为0)。


二、N-HiTS:层次插值改进

2.1 N-BEATS的局限性

N-BEATS 在短预测范围(few steps)上表现优秀,但随着预测步长增加,性能急剧下降。原因包括:

  1. 感受野固定:所有块使用相同的回看窗口
  2. 混合频率信息:长短期信号混在一起,难以分别处理
  3. 插值简单:直接求和,没有考虑不同频率的特性

2.2 核心创新:多速率采样

N-HiTS 的第一个关键创新是多速率采样(Multi-Rate Sampling)。

频率分离

在每个块内部,N-HiTS 使用下采样将输入分离为不同频率的成分:

class MultiRateSampler(nn.Module):
    def __init__(self, in_len, downsample_factors=[2, 2]):
        super().__init__()
        self.pools = nn.ModuleList([
            nn.MaxPool1d(kernel_size=f) for f in downsample_factors
        ])
        
    def forward(self, x):
        """
        x: [batch, in_len, features]
        返回不同采样率的特征
        """
        features = [x.transpose(1, 2)]  # [batch, features, in_len]
        
        for pool in self.pools:
            downsampled = pool(features[-1])
            features.append(downsampled)
        
        return features[::-1]  # 从粗到细

频率特定块

每个频率成分由专门的块处理:

  • 粗粒度块:处理低频(长期)信息
  • 细粒度块:处理高频(短期)信息

2.3 核心创新:层次插值

N-HiTS 的第二个关键创新是层次插值(Hierarchical Interpolation)。

问题

传统方法在预测时,所有块输出直接求和。但不同频率的预测具有不同的分辨率:

  • 低频块输出短序列
  • 高频块输出长序列

插值机制

class HierarchicalInterpolator(nn.Module):
    def __init__(self, forecast_len, interpolation_modes=['linear', 'nearest']):
        super().__init__()
        self.interp_modes = interpolation_modes
        
    def interpolate(self, x, target_len, mode='linear'):
        """
        将短序列插值到目标长度
        """
        if mode == 'linear':
            # 线性插值
            x_expanded = F.interpolate(
                x.unsqueeze(1),  # [batch, 1, len]
                size=target_len,
                mode='linear',
                align_corners=True
            ).squeeze(1)
        elif mode == 'cubic':
            # 三次插值
            x_expanded = F.interpolate(
                x.unsqueeze(1),
                size=target_len,
                mode='cubic',
                align_corners=True
            ).squeeze(1)
        else:
            # 最近邻
            x_expanded = F.interpolate(
                x.unsqueeze(1),
                size=target_len,
                mode='nearest'
            ).squeeze(1)
        
        return x_expanded

2.4 N-HiTS 完整架构

class NHiTS(nn.Module):
    def __init__(self, in_len, out_len, n_blocks=3, n_layers=2, 
                 n_units=512, pooling_size=2):
        super().__init__()
        
        self.blocks = nn.ModuleList()
        for i in range(n_blocks):
            # 每个块有不同的下采样率
            downsample_factor = pooling_size ** i
            block_in_len = in_len // downsample_factor
            block_out_len = out_len // downsample_factor
            
            self.blocks.append(
                NHiTSBlock(
                    in_len=block_in_len,
                    out_len=block_out_len,
                    n_layers=n_layers,
                    n_units=n_units
                )
            )
        
        # 层次插值器
        self.interpolators = nn.ModuleList([
            HierarchicalInterpolator(out_len, out_len // (pooling_size ** i))
            for i in range(n_blocks)
        ])
    
    def forward(self, x):
        """
        x: [batch, in_len, n_features]
        """
        forecasts = []
        
        for i, (block, interp) in enumerate(zip(self.blocks, self.interpolators)):
            down_x = self.downsample(x, pooling_size ** i)
            _, forecast = block(down_x)
            # 插值到目标长度
            forecast_upsampled = interp.interpolate(forecast, self.out_len, 
                                                   mode='linear')
            forecasts.append(forecast_upsampled)
        
        # 加权融合
        return torch.stack(forecasts, dim=-1).sum(dim=-1)
    
    def downsample(self, x, factor):
        """下采样"""
        return x[:, ::factor, :]

三、对比分析

3.1 N-BEATS vs N-HiTS

特性N-BEATSN-HiTS
采样方式固定速率多速率
频率处理混合分离
插值机制直接求和层次插值
长期预测精度下降精度保持
参数量较大较小(参数共享)
可解释性趋势/季节栈分离频率成分分离

3.2 数学对比

N-BEATS 的输出

其中 是第 个块的基础展开函数。

N-HiTS 的输出

其中 是第 个块的插值函数, 是下采样后的输入。

3.3 在M4数据集上的表现

模型SMAPEMASE
N-BEATS11.7%0.83
N-HiTS11.2%0.79
Transformer13.1%0.91
ARIMA12.5%0.88

四、PyTorch完整实现

4.1 N-HiTS 完整代码

import torch
import torch.nn as nn
import torch.nn.functional as F
import math
 
class NHiTSBlock(nn.Module):
    """N-HiTS 块"""
    
    def __init__(self, in_len, out_len, n_layers=2, n_units=512, 
                 n_hidden_units=128, pooling_kernel_size=4):
        super().__init__()
        
        self.in_len = in_len
        self.out_len = out_len
        
        # 输入投影
        self.input_layer = nn.Linear(in_len, n_units)
        
        # 多层感知机
        self.layers = nn.ModuleList([
            nn.Linear(n_units, n_units) for _ in range(n_layers)
        ])
        
        # 频率感知的下采样
        self.pooling = nn.MaxPool1d(kernel_size=pooling_kernel_size, 
                                     stride=pooling_kernel_size)
        
        # Backcast 和 Forecast 网络
        hidden_len = in_len // pooling_kernel_size
        self.backcast_fc = nn.Sequential(
            nn.Linear(n_units, n_hidden_units),
            nn.ReLU(),
            nn.Linear(n_hidden_units, hidden_len)
        )
        
        forecast_len = out_len // pooling_kernel_size
        self.forecast_fc = nn.Sequential(
            nn.Linear(n_units, n_hidden_units),
            nn.ReLU(),
            nn.Linear(n_hidden_units, forecast_len)
        )
        
    def forward(self, x):
        """
        x: [batch, in_len]
        """
        # 输入投影
        h = self.input_layer(x)
        
        # 多层处理
        for layer in self.layers:
            h = F.gelu(layer(h) + h)  # 残差连接
        
        # 生成预测
        backcast = self.backcast_fc(h)
        forecast = self.forecast_fc(h)
        
        return backcast, forecast
 
 
class NHiTSModel(nn.Module):
    """完整的 N-HiTS 模型"""
    
    def __init__(self, in_len=7*24, out_len=24, n_blocks=3, 
                 n_layers=2, n_units=512, n_hidden=128):
        super().__init__()
        
        self.in_len = in_len
        self.out_len = out_len
        
        # 构建多个块(从粗到细)
        self.blocks = nn.ModuleList()
        pooling_kernel_size = 2
        
        for i in range(n_blocks):
            block_in_len = in_len // (pooling_kernel_size ** i)
            block_out_len = out_len // (pooling_kernel_size ** i)
            
            self.blocks.append(
                NHiTSBlock(
                    in_len=block_in_len,
                    out_len=block_out_len,
                    n_layers=n_layers,
                    n_units=n_units,
                    n_hidden_units=n_hidden,
                    pooling_kernel_size=pooling_kernel_size
                )
            )
    
    def forward(self, x):
        """
        x: [batch, in_len]
        """
        # x 是回看窗口
        total_forecast = torch.zeros(x.shape[0], self.out_len, 
                                    device=x.device)
        
        for block in self.blocks:
            backcast, forecast = block(x)
            
            # 上采样回backcast长度并相减
            if backcast.shape[1] < x.shape[1]:
                backcast_up = F.interpolate(
                    backcast.unsqueeze(1), size=x.shape[1], 
                    mode='linear', align_corners=True
                ).squeeze(1)
                x = x - backcast_up
            
            # 上采样forecast并累加
            forecast_up = F.interpolate(
                forecast.unsqueeze(1), size=self.out_len,
                mode='linear', align_corners=True
            ).squeeze(1)
            total_forecast = total_forecast + forecast_up
        
        return total_forecast

4.2 训练脚本

import numpy as np
import pandas as pd
import torch
from torch.utils.data import DataLoader, TensorDataset
 
def create_sequences(data, in_len, out_len):
    """创建训练序列"""
    X, y = [], []
    for i in range(len(data) - in_len - out_len + 1):
        X.append(data[i:i+in_len])
        y.append(data[i+in_len:i+in_len+out_len])
    return np.array(X), np.array(y)
 
 
def train_nhits(model, train_data, val_data, epochs=100, batch_size=64, lr=1e-3):
    """训练 N-HiTS 模型"""
    
    # 准备数据
    X_train, y_train = create_sequences(train_data, model.in_len, model.out_len)
    X_val, y_val = create_sequences(val_data, model.in_len, model.out_len)
    
    train_loader = DataLoader(
        TensorDataset(torch.FloatTensor(X_train), torch.FloatTensor(y_train)),
        batch_size=batch_size, shuffle=True
    )
    
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
        optimizer, patience=10, factor=0.5
    )
    
    best_val_loss = float('inf')
    
    for epoch in range(epochs):
        # 训练
        model.train()
        train_loss = 0
        for X_batch, y_batch in train_loader:
            optimizer.zero_grad()
            y_pred = model(X_batch)
            loss = F.mse_loss(y_pred, y_batch)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
        
        # 验证
        model.eval()
        with torch.no_grad():
            y_val_pred = model(torch.FloatTensor(X_val))
            val_loss = F.mse_loss(y_val_pred, torch.FloatTensor(y_val))
        
        scheduler.step(val_loss)
        
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), 'nhits_best.pt')
        
        if epoch % 10 == 0:
            print(f"Epoch {epoch}: Train Loss = {train_loss/len(train_loader):.4f}, "
                  f"Val Loss = {val_loss:.4f}")
    
    return model

五、实际应用示例

5.1 电力负荷预测

# 加载数据(假设是每小时的电力负荷)
data = pd.read_csv('electricity.csv')['load'].values
 
# 归一化
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
data_scaled = scaler.fit_transform(data.reshape(-1, 1)).flatten()
 
# 划分数据
train_size = int(len(data_scaled) * 0.8)
train_data = data_scaled[:train_size]
test_data = data_scaled[train_size:]
 
# 创建模型
model = NHiTSModel(
    in_len=7*24,   # 回看一周
    out_len=24,    # 预测一天
    n_blocks=3,
    n_layers=2,
    n_units=512
)
 
# 训练
model = train_nhits(model, train_data, test_data, epochs=100)
 
# 预测
model.load_state_dict(torch.load('nhits_best.pt'))
model.eval()
 
with torch.no_grad():
    # 预测未来24小时
    last_window = torch.FloatTensor(train_data[-7*24:]).unsqueeze(0)
    forecast = model(last_window)
    
    # 反归一化
    forecast_unscaled = scaler.inverse_transform(
        forecast.numpy().reshape(-1, 1)
    ).flatten()

5.2 评估指标

def evaluate_forecast(y_true, y_pred, scaler=None):
    """评估预测性能"""
    if scaler:
        y_true = scaler.inverse_transform(y_true.reshape(-1, 1)).flatten()
        y_pred = scaler.inverse_transform(y_pred.reshape(-1, 1)).flatten()
    
    # MSE / RMSE
    mse = np.mean((y_true - y_pred) ** 2)
    rmse = np.sqrt(mse)
    
    # MAE
    mae = np.mean(np.abs(y_true - y_pred))
    
    # MAPE
    mape = np.mean(np.abs((y_true - y_pred) / (y_true + 1e-8))) * 100
    
    # SMAPE
    smape = 200 * np.mean(np.abs(y_pred - y_true) / 
                          (np.abs(y_true) + np.abs(y_pred) + 1e-8))
    
    print(f"RMSE: {rmse:.4f}")
    print(f"MAE: {mae:.4f}")
    print(f"MAPE: {mape:.2f}%")
    print(f"SMAPE: {smape:.2f}%")
    
    return {'rmse': rmse, 'mae': mae, 'mape': mape, 'smape': smape}

六、参考


相关阅读

Footnotes

  1. Oreshkin, B. N., Carpov, D., Chapados, N., & Bengio, Y. (2019). N-BEATS: Neural basis expansion analysis for interpretable time series forecasting. arXiv:1905.10437.

  2. Challu, C., Olivares, K. G., Oreshkin, B. N., Garza, F. G., Canseco, M. M., & Dubrawski, A. (2023). NHITS: Neural hierarchical interpolation for time series forecasting. Pattern Recognition, 123, 108372.