概述

频域分析是时间序列分析的核心方法之一,它通过将时间序列从时域转换到频域,揭示信号的周期结构、频率成分和能量分布。与时域方法相比,频域分析特别擅长处理具有周期性模式的时间序列。1

本章将介绍:

  1. 傅里叶变换基础与频谱分析
  2. 谱密度估计方法
  3. 周期检测技术
  4. 频域特征与深度学习的结合

一、傅里叶变换基础

1.1 离散傅里叶变换(DFT)

对于离散时间序列 ,其离散傅里叶变换定义为:

逆变换为:

1.2 频谱的物理意义

频谱分量频率范围物理意义
直流分量序列均值
低频缓慢变化趋势
高频快速变化细节/噪声

功率谱密度(PSD)

import numpy as np
import matplotlib.pyplot as plt
 
def dft(x):
    """计算离散傅里叶变换"""
    N = len(x)
    n = np.arange(N)
    k = n.reshape((N, 1))
    exp_term = np.exp(-2j * np.pi * k * n / N)
    return np.dot(exp_term, x)
 
 
def idft(X):
    """计算逆离散傅里叶变换"""
    N = len(X)
    n = np.arange(N)
    k = n.reshape((N, 1))
    exp_term = np.exp(2j * np.pi * k * n / N)
    return np.dot(exp_term, X) / N
 
 
def plot_frequency_spectrum(signal, sampling_rate=1.0):
    """绘制频谱图"""
    N = len(signal)
    
    # 使用 FFT(更快)
    X = np.fft.fft(signal)
    
    # 计算频率轴
    freq = np.fft.fftfreq(N, d=1/sampling_rate)
    
    # 计算功率谱
    power = np.abs(X)**2 / N
    
    # 只绘制正频率部分
    pos_mask = freq >= 0
    
    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8))
    
    # 时域信号
    ax1.plot(signal)
    ax1.set_title('Time Domain Signal')
    ax1.set_xlabel('Time (samples)')
    ax1.set_ylabel('Amplitude')
    ax1.grid(True)
    
    # 频谱
    ax2.stem(freq[pos_mask], power[pos_mask], use_line_collection=True)
    ax2.set_title('Frequency Spectrum')
    ax2.set_xlabel('Frequency (Hz)')
    ax2.set_ylabel('Power')
    ax2.set_xlim([0, sampling_rate/2])
    ax2.grid(True)
    
    plt.tight_layout()
    plt.show()
    
    return freq, power
 
 
# 示例:分析多频率信号
t = np.linspace(0, 1, 256)
signal = (1.0 * np.sin(2 * np.pi * 5 * t) +   # 5 Hz, 幅度1.0
          0.5 * np.sin(2 * np.pi * 15 * t) +  # 15 Hz, 幅度0.5
          0.3 * np.sin(2 * np.pi * 30 * t))   # 30 Hz, 幅度0.3
signal += 0.1 * np.random.randn(len(t))  # 添加噪声
 
plot_frequency_spectrum(signal, sampling_rate=256)

二、功率谱密度估计

2.1 周期图法(Periodogram)

周期图是最简单的谱估计方法:

def periodogram(x, fs=1.0):
    """
    周期图功率谱估计
    
    Args:
        x: 时间序列
        fs: 采样频率
    Returns:
        freq: 频率数组
        power: 功率谱密度
    """
    N = len(x)
    X = np.fft.fft(x)
    power = np.abs(X)**2 / N
    
    freq = np.fft.fftfreq(N, d=1/fs)
    
    # 只返回正频率
    pos_mask = freq >= 0
    return freq[pos_mask], power[pos_mask]
 
 
# Welch 方法:分段的周期图平均
def welch_psd(x, fs=1.0, nperseg=256, noverlap=None):
    """
    Welch 方法:分段平均周期图
    
    通过重叠分段减少方差
    """
    if noverlap is None:
        noverlap = nperseg // 2
    
    # 分段
    n_segments = (len(x) - nperseg) // (nperseg - noverlap) + 1
    
    psd_sum = np.zeros(nperseg)
    for i in range(n_segments):
        start = i * (nperseg - noverlap)
        segment = x[start:start + nperseg]
        
        # 加窗(汉宁窗减少频谱泄漏)
        window = np.hanning(nperseg)
        segment_windowed = segment * window
        
        # 周期图
        X = np.fft.fft(segment_windowed)
        psd_sum += np.abs(X)**2
    
    # 平均
    psd = psd_sum / n_segments
    freq = np.fft.fftfreq(nperseg, d=1/fs)
    
    # 归一化
    psd = psd / (np.sum(window**2) * fs)
    
    pos_mask = freq >= 0
    return freq[pos_mask], psd[pos_mask]

2.2 参数化谱估计

当信号可以用参数模型描述时,参数化方法更有效。

AR 模型谱估计

def ar_psd(y, order, fs=1.0):
    """
    使用 AR 模型估计功率谱密度
    
    使用 Yule-Walker 方法估计 AR 系数
    """
    from scipy.linalg import toeplitz, solve_toeplitz
    
    # 计算自相关
    n = len(y)
    r = np.correlate(y, y, mode='full') / n
    r = r[n-1:]  # 取正延迟部分
    
    # 构建 Toeplitz 矩阵并求解 Yule-Walker 方程
    R = toeplitz(r[:order])
    r_vec = r[1:order+1]
    
    # AR 系数 (使用 Levinson-Durbin 递归更高效)
    from scipy.linalg import solve
    a = solve(R, -r_vec)
    sigma2 = r[0] + np.dot(r_vec, a)
    
    # 计算频率响应
    freq = np.linspace(0, fs/2, 1000)
    w = 2 * np.pi * freq / fs
    
    # AR 系统的频率响应
    denom = 1 - np.sum([a[i] * np.exp(-1j * (i+1) * w) for i in range(order)], axis=0)
    psd = sigma2 / (np.abs(denom)**2)
    
    return freq, psd

2.3 谱估计方法对比

# 对比不同谱估计方法
from scipy import signal as sig
 
t = np.linspace(0, 1, 512)
x = (np.sin(2*np.pi*10*t) + 0.5*np.sin(2*np.pi*30*t) + 
     0.2*np.random.randn(len(t)))
 
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
 
# 1. 周期图
freq1, psd1 = periodogram(x)
axes[0, 0].plot(freq1, 10*np.log10(psd1))
axes[0, 0].set_title('Periodogram')
axes[0, 0].set_xlabel('Frequency (Hz)')
axes[0, 0].set_ylabel('Power (dB)')
axes[0, 0].grid(True)
 
# 2. Welch 方法
freq2, psd2 = welch_psd(x, nperseg=128)
axes[0, 1].plot(freq2, 10*np.log10(psd2))
axes[0, 1].set_title('Welch Method')
axes[0, 1].set_xlabel('Frequency (Hz)')
axes[0, 1].set_ylabel('Power (dB)')
axes[0, 1].grid(True)
 
# 3. AR 模型
freq3, psd3 = ar_psd(x, order=20)
axes[1, 0].plot(freq3, 10*np.log10(psd3))
axes[1, 0].set_title('AR Model (order=20)')
axes[1, 0].set_xlabel('Frequency (Hz)')
axes[1, 0].set_ylabel('Power (dB)')
axes[1, 0].grid(True)
 
# 4. SciPy 信号库
freq4, psd4 = sig.welch(x, nperseg=128)
axes[1, 1].plot(freq4, 10*np.log10(psd4))
axes[1, 1].set_title('scipy.signal.welch')
axes[1, 1].set_xlabel('Frequency (Hz)')
axes[1, 1].set_ylabel('Power (dB)')
axes[1, 1].grid(True)
 
plt.tight_layout()
plt.show()

三、周期检测

3.1 自相关函数(ACF)

自相关函数是检测周期性的经典方法:

def autocorrelation(x, max_lag=None):
    """计算自相关函数"""
    n = len(x)
    if max_lag is None:
        max_lag = n - 1
    
    x_centered = x - np.mean(x)
    
    acf = np.correlate(x_centered, x_centered, mode='full')
    acf = acf[n-1:n+max_lag]
    acf = acf / acf[0]  # 归一化
    
    return np.arange(max_lag + 1), acf
 
 
def plot_acf_with_periods(x, max_lag=None):
    """绘制自相关图并标注显著周期"""
    lags, acf = autocorrelation(x, max_lag)
    
    plt.figure(figsize=(12, 6))
    plt.stem(lags, acf, use_line_collection=True)
    
    # 置信区间(95%)
    n = len(x)
    confidence = 1.96 / np.sqrt(n)
    plt.axhline(y=confidence, color='r', linestyle='--', label='95% CI')
    plt.axhline(y=-confidence, color='r', linestyle='--')
    
    # 找峰值(排除 lag=0)
    from scipy.signal import find_peaks
    peaks, properties = find_peaks(acf[1:], height=confidence, distance=5)
    
    for peak in peaks:
        period = peak + 1  # lag 从 1 开始
        plt.annotate(f'P={period}', xy=(peak+1, acf[peak+1]),
                    xytext=(peak+10, acf[peak+1]+0.1),
                    arrowprops=dict(arrowstyle='->', color='green'))
    
    plt.xlabel('Lag')
    plt.ylabel('Autocorrelation')
    plt.title('Autocorrelation Function')
    plt.grid(True)
    plt.legend()
    plt.show()
    
    return peaks + 1  # 返回检测到的周期

3.2 傅里叶谱峰检测

def detect_periods_via_fft(x, fs=1.0, top_n=5, min_period=2):
    """
    通过 FFT 检测主要周期
    
    Returns:
        periods: 检测到的周期列表
        powers: 对应的功率
    """
    N = len(x)
    
    # 去均值(移除直流分量)
    x_centered = x - np.mean(x)
    
    # FFT
    X = np.fft.fft(x_centered)
    power = np.abs(X)**2 / N
    
    # 只考虑正频率
    freq = np.fft.fftfreq(N, d=1/fs)
    pos_mask = (freq > 0) & (freq <= fs/2)
    
    freq_pos = freq[pos_mask]
    power_pos = power[pos_mask]
    
    # 转换为周期
    periods = 1 / freq_pos
    
    # 只考虑合理范围内的周期
    valid_mask = (periods >= min_period) & (periods <= N/2)
    
    # 找峰值
    from scipy.signal import find_peaks
    peaks, props = find_peaks(power_pos[valid_mask], height=np.percentile(power_pos[valid_mask], 90))
    
    # 按功率排序,取前 top_n
    peak_powers = power_pos[valid_mask][peaks]
    sorted_idx = np.argsort(peak_powers)[::-1][:top_n]
    
    detected_periods = periods[valid_mask][peaks[sorted_idx]]
    detected_powers = peak_powers[sorted_idx]
    
    return detected_periods, detected_powers
 
 
# 示例
periods, powers = detect_periods_via_fft(signal)
print("检测到的周期:")
for p, pw in zip(periods, powers):
    print(f"  周期 = {p:.2f} (功率 = {pw:.4f})")

3.3 谱分析法

def spectral_analysis_periodogram(x, fs=1.0):
    """
    使用周期图进行谱分析
    
    返回显著频率和周期
    """
    freq, psd = periodogram(x, fs)
    
    # 找峰值
    from scipy.signal import find_peaks
    peaks, _ = find_peaks(psd, height=np.mean(psd) + 2*np.std(psd))
    
    results = []
    for peak in peaks:
        results.append({
            'frequency': freq[peak],
            'period': 1/freq[peak] if freq[peak] > 0 else np.inf,
            'power': psd[peak]
        })
    
    return sorted(results, key=lambda x: x['power'], reverse=True)
 
 
# 综合周期检测
def comprehensive_period_detection(x, fs=1.0):
    """
    综合多种方法的周期检测
    """
    print("=" * 50)
    print("综合周期检测结果")
    print("=" * 50)
    
    # 1. ACF
    print("\n[1] 自相关方法:")
    acf_periods = plot_acf_with_periods(x)
    print(f"    检测到的周期: {acf_periods}")
    
    # 2. FFT
    print("\n[2] FFT 谱峰检测:")
    fft_periods, fft_powers = detect_periods_via_fft(x, fs)
    for p, pw in zip(fft_periods, fft_powers):
        print(f"    周期 = {p:.2f}")
    
    # 3. 谱分析
    print("\n[3] 谱分析:")
    spectral_results = spectral_analysis_periodogram(x, fs)
    for r in spectral_results[:5]:
        print(f"    周期 = {r['period']:.2f}, 频率 = {r['frequency']:.4f} Hz")
    
    return {
        'acf_periods': acf_periods,
        'fft_periods': fft_periods,
        'spectral_periods': [r['period'] for r in spectral_results]
    }

四、频域特征提取

4.1 常用频域特征

def extract_frequency_features(x, fs=1.0):
    """
    提取频域特征
    """
    freq, psd = periodogram(x, fs)
    cumsum_psd = np.cumsum(psd)
    total_power = cumsum_psd[-1]
    
    features = {}
    
    # 1. 谱重心 (Spectral Centroid)
    features['spectral_centroid'] = np.sum(freq * psd) / np.sum(psd)
    
    # 2. 谱熵 (Spectral Entropy)
    psd_norm = psd / (np.sum(psd) + 1e-10)
    features['spectral_entropy'] = -np.sum(psd_norm * np.log2(psd_norm + 1e-10))
    
    # 3. 谱平坦度 (Spectral Flatness) - 衡量噪声 vs 谐波
    geometric_mean = np.exp(np.mean(np.log(psd + 1e-10)))
    arithmetic_mean = np.mean(psd)
    features['spectral_flatness'] = geometric_mean / (arithmetic_mean + 1e-10)
    
    # 4. 主频率
    features['dominant_frequency'] = freq[np.argmax(psd)]
    features['dominant_period'] = 1 / (features['dominant_frequency'] + 1e-10)
    
    # 5. 频段能量
    n = len(freq)
    low_band = slice(0, n//4)
    mid_band = slice(n//4, n//2)
    high_band = slice(n//2, n)
    
    features['low_band_power'] = np.sum(psd[low_band]) / total_power
    features['mid_band_power'] = np.sum(psd[mid_band]) / total_power
    features['high_band_power'] = np.sum(psd[high_band]) / total_power
    
    # 6. 频谱质心时间变化(时变信号的频域特征)
    # ...
    
    return features
 
 
# 计算并显示特征
features = extract_frequency_features(signal)
print("频域特征:")
for name, value in features.items():
    print(f"  {name}: {value:.4f}")

4.2 滚动频域特征

def rolling_frequency_features(x, window_size=50, hop_size=1, fs=1.0):
    """
    计算滚动频域特征
    
    适用于非平稳信号
    """
    n = len(x)
    n_windows = (n - window_size) // hop_size + 1
    
    all_features = []
    time_indices = []
    
    for i in range(n_windows):
        start = i * hop_size
        end = start + window_size
        window = x[start:end]
        
        feats = extract_frequency_features(window, fs)
        all_features.append(feats)
        time_indices.append(start + window_size // 2)
    
    return time_indices, all_features
 
 
# 可视化滚动特征
time_idx, feats_list = rolling_frequency_features(signal, window_size=100, hop_size=10)
 
# 转为 DataFrame 便于分析
import pandas as pd
df_feats = pd.DataFrame(feats_list, index=time_idx)
 
# 绘图
fig, axes = plt.subplots(3, 1, figsize=(14, 10))
 
axes[0].plot(signal)
axes[0].set_title('Original Signal')
for idx in time_idx:
    axes[0].axvline(idx, color='gray', alpha=0.1)
 
axes[1].plot(time_idx, df_feats['dominant_frequency'])
axes[1].set_title('Dominant Frequency over Time')
axes[1].set_ylabel('Frequency (Hz)')
 
axes[2].plot(time_idx, df_feats['spectral_centroid'])
axes[2].set_title('Spectral Centroid over Time')
axes[2].set_ylabel('Frequency (Hz)')
 
plt.tight_layout()
plt.show()

五、频域与深度学习的结合

5.1 频域输入特征

class FrequencyDomainFeatures(nn.Module):
    """
    频域特征提取模块
    """
    def __init__(self, n_fft=256, hop_length=128, n_mels=64):
        super().__init__()
        
        self.n_fft = n_fft
        self.hop_length = hop_length
        
        # 可以使用 STFT 获取时频表示
        self.register_buffer('window', torch.hann_window(n_fft))
    
    def forward(self, x):
        """
        Args:
            x: [batch, seq_len] 时间序列
        Returns:
            features: [batch, n_features]
        """
        # 短时傅里叶变换
        stft = torch.stft(
            x, 
            n_fft=self.n_fft,
            hop_length=self.hop_length,
            window=self.window,
            return_complex=True
        )
        
        # 幅度谱
        mag = torch.abs(stft)  # [batch, freq, time]
        
        # 功率谱
        power = mag ** 2
        
        # 全局平均
        mean_power = power.mean(dim=-1)  # [batch, freq]
        
        # 统计特征
        features = []
        
        # 1. 对数功率均值
        log_power = torch.log(power + 1e-10)
        features.append(log_power.mean(dim=-1))
        
        # 2. 谱质心
        freq_bins = torch.arange(mag.shape[1], device=x.device).float()
        spectral_centroid = (mag * freq_bins.unsqueeze(0).unsqueeze(-1)).sum(dim=1) / (mag.sum(dim=1) + 1e-10)
        features.append(spectral_centroid.mean(dim=-1))
        
        # 3. 谱熵
        p_norm = power / (power.sum(dim=1, keepdim=True) + 1e-10)
        spectral_entropy = -(p_norm * torch.log(p_norm + 1e-10)).sum(dim=1)
        features.append(spectral_entropy.mean(dim=-1))
        
        return torch.cat(features, dim=-1)

5.2 FEDformer 架构

FEDformer(Fourier Enhanced Decomposition Transformer)是结合频域分析的时间序列预测模型:

class FourierBasisDecomposition(nn.Module):
    """
    傅里叶基分解
    
    将时间序列分解为多个频率成分
    """
    def __init__(self, n_components=5):
        super().__init__()
        self.n_components = n_components
        
    def forward(self, x):
        """
        Args:
            x: [batch, seq_len]
        Returns:
            low_freq: 低频成分
            high_freq: 高频成分
        """
        batch_size, seq_len = x.shape
        
        # FFT
        X = torch.fft.fft(x, dim=1)
        
        # 按频率排序并分离
        magnitude = torch.abs(X)
        
        # 取最高频率成分(高频噪声)
        k = self.n_components
        high_freq_indices = torch.topk(magnitude[:, k:-k], k=k, dim=1).indices
        
        # 低频部分
        X_low = X.clone()
        for b in range(batch_size):
            for idx in high_freq_indices[b]:
                X_low[b, idx] = 0
                X_low[b, seq_len - idx] = 0
        
        high_freq_indices_set = set()
        for b in range(batch_size):
            for idx in high_freq_indices[b]:
                high_freq_indices_set.add((b, idx.item()))
                high_freq_indices_set.add((b, seq_len - idx.item()))
        
        X_high = torch.zeros_like(X)
        for b, idx in high_freq_indices_set:
            X_high[b, idx] = X[b, idx]
        
        # 逆变换
        x_low = torch.fft.ifft(X_low, dim=1).real
        x_high = torch.fft.ifft(X_high, dim=1).real
        
        return x_low, x_high
 
 
class FEDformerEncoder(nn.Module):
    """
    FEDformer 风格的编码器
    """
    def __init__(self, d_model=128, n_heads=4, n_components=5):
        super().__init__()
        
        self.fourier_decomp = FourierBasisDecomposition(n_components)
        
        # 低频分支
        self.low_encoder_layers = nn.ModuleList([
            nn.TransformerEncoderLayer(
                d_model=d_model,
                nhead=n_heads,
                dim_feedforward=d_model * 4,
                batch_first=True
            ) for _ in range(2)
        ])
        
        # 高频分支
        self.high_encoder_layers = nn.ModuleList([
            nn.TransformerEncoderLayer(
                d_model=d_model,
                nhead=n_heads,
                dim_feedforward=d_model * 4,
                batch_first=True
            ) for _ in range(2)
        ])
        
        # 投影层
        self.projection = nn.Linear(d_model * 2, d_model)
        
    def forward(self, x):
        """
        x: [batch, seq_len, d_model]
        """
        # 分离频率成分
        x_low_freq, x_high_freq = self.fourier_decomp(x)
        
        # 分别编码
        for layer in self.low_encoder_layers:
            x_low_freq = layer(x_low_freq)
        
        for layer in self.high_encoder_layers:
            x_high_freq = layer(x_high_freq)
        
        # 拼接
        x_combined = torch.cat([x_low_freq, x_high_freq], dim=-1)
        
        # 投影回原始维度
        x_out = self.projection(x_combined)
        
        return x_out

5.3 Neural Fourier Transform

Neural Fourier Transform 将傅里叶分析作为神经网络的一层:

class NeuralFourierTransform(nn.Module):
    """
    可学习的傅里叶变换层
    
    学习最适合任务的频率表示
    """
    def __init__(self, input_dim, n_frequencies=64):
        super().__init__()
        
        self.n_frequencies = n_frequencies
        
        # 可学习的频率基函数参数
        self.freq_params = nn.Parameter(
            torch.randn(n_frequencies, input_dim) * 0.1
        )
        
        # 频率振幅
        self.amplitude_params = nn.Parameter(torch.ones(n_frequencies))
        
        # 相位偏移
        self.phase_params = nn.Parameter(torch.zeros(n_frequencies))
    
    def forward(self, x):
        """
        Args:
            x: [batch, seq_len, input_dim]
        Returns:
            freq_features: [batch, n_frequencies]
        """
        batch_size, seq_len, _ = x.shape
        
        # 计算每个频率与输入的相关性
        # x_flat: [batch * seq_len, input_dim]
        x_flat = x.reshape(-1, x.shape[-1])
        
        # 计算频率响应
        # cosine_similarity: [batch * seq_len, n_frequencies]
        cos_sim = torch.matmul(x_flat, self.freq_params.t())
        
        # 应用振幅和相位
        freq_response = self.amplitude_params.unsqueeze(0) * torch.cos(
            cos_sim + self.phase_params.unsqueeze(0)
        )
        
        # 重新整形并池化
        freq_response = freq_response.reshape(batch_size, seq_len, -1)
        freq_features = freq_response.mean(dim=1)  # 时间池化
        
        return freq_features
 
 
class NFTTBlock(nn.Module):
    """
    Neural Fourier Transform 块
    
    结合频域特征和时域卷积
    """
    def __init__(self, d_model=128, n_frequencies=64, kernel_size=3):
        super().__init__()
        
        self.fourier_transform = NeuralFourierTransform(d_model, n_frequencies)
        
        self.freq_projection = nn.Linear(n_frequencies, d_model)
        
        self.temporal_conv = nn.Conv1d(
            d_model, d_model, kernel_size,
            padding=kernel_size//2
        )
        
        self.norm = nn.LayerNorm(d_model)
        
    def forward(self, x):
        """
        x: [batch, seq_len, d_model]
        """
        # 频域分支
        freq_features = self.fourier_transform(x)
        freq_out = self.freq_projection(freq_features)
        
        # 时域分支
        x_conv = x.transpose(1, 2)  # [batch, d_model, seq_len]
        temporal_out = self.temporal_conv(x_conv).transpose(1, 2)  # [batch, seq_len, d_model]
        
        # 融合
        out = freq_out.unsqueeze(1) + temporal_out
        out = self.norm(out)
        
        return out

六、实践指南

6.1 参数选择

参数选择建议
FFT 大小 (N)2的幂次,通常 256, 512, 1024, 2048
窗口类型汉宁窗减少频谱泄漏,矩形窗分辨率最高
重叠比例Welch 方法建议 50-75% 重叠
AR 模型阶数AIC/BIC 准则选择,或使用

6.2 频率分辨率

其中 是采样频率, 是 FFT 大小。

def frequency_resolution(sampling_rate, n_fft):
    """计算频率分辨率"""
    return sampling_rate / n_fft
 
 
# 示例
print(f"256 点 FFT, 采样率 1000 Hz:")
print(f"  分辨率 = {frequency_resolution(1000, 256):.2f} Hz")
print(f"1024 点 FFT, 采样率 1000 Hz:")
print(f"  分辨率 = {frequency_resolution(1000, 1024):.2f} Hz")

七、参考


相关阅读

Footnotes

  1. Proakis, J. G., & Manolakis, D. G. (2007). Digital Signal Processing: Principles, Algorithms, and Applications. Prentice Hall.