概述
频域分析是时间序列分析的核心方法之一,它通过将时间序列从时域转换到频域,揭示信号的周期结构、频率成分和能量分布。与时域方法相比,频域分析特别擅长处理具有周期性模式的时间序列。1
本章将介绍:
- 傅里叶变换基础与频谱分析
- 谱密度估计方法
- 周期检测技术
- 频域特征与深度学习的结合
一、傅里叶变换基础
1.1 离散傅里叶变换(DFT)
对于离散时间序列 ,其离散傅里叶变换定义为:
逆变换为:
1.2 频谱的物理意义
| 频谱分量 | 频率范围 | 物理意义 |
|---|---|---|
| 直流分量 | 序列均值 | |
| 低频 | 缓慢变化趋势 | |
| 高频 | 快速变化细节/噪声 |
功率谱密度(PSD):
import numpy as np
import matplotlib.pyplot as plt
def dft(x):
"""计算离散傅里叶变换"""
N = len(x)
n = np.arange(N)
k = n.reshape((N, 1))
exp_term = np.exp(-2j * np.pi * k * n / N)
return np.dot(exp_term, x)
def idft(X):
"""计算逆离散傅里叶变换"""
N = len(X)
n = np.arange(N)
k = n.reshape((N, 1))
exp_term = np.exp(2j * np.pi * k * n / N)
return np.dot(exp_term, X) / N
def plot_frequency_spectrum(signal, sampling_rate=1.0):
"""绘制频谱图"""
N = len(signal)
# 使用 FFT(更快)
X = np.fft.fft(signal)
# 计算频率轴
freq = np.fft.fftfreq(N, d=1/sampling_rate)
# 计算功率谱
power = np.abs(X)**2 / N
# 只绘制正频率部分
pos_mask = freq >= 0
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8))
# 时域信号
ax1.plot(signal)
ax1.set_title('Time Domain Signal')
ax1.set_xlabel('Time (samples)')
ax1.set_ylabel('Amplitude')
ax1.grid(True)
# 频谱
ax2.stem(freq[pos_mask], power[pos_mask], use_line_collection=True)
ax2.set_title('Frequency Spectrum')
ax2.set_xlabel('Frequency (Hz)')
ax2.set_ylabel('Power')
ax2.set_xlim([0, sampling_rate/2])
ax2.grid(True)
plt.tight_layout()
plt.show()
return freq, power
# 示例:分析多频率信号
t = np.linspace(0, 1, 256)
signal = (1.0 * np.sin(2 * np.pi * 5 * t) + # 5 Hz, 幅度1.0
0.5 * np.sin(2 * np.pi * 15 * t) + # 15 Hz, 幅度0.5
0.3 * np.sin(2 * np.pi * 30 * t)) # 30 Hz, 幅度0.3
signal += 0.1 * np.random.randn(len(t)) # 添加噪声
plot_frequency_spectrum(signal, sampling_rate=256)二、功率谱密度估计
2.1 周期图法(Periodogram)
周期图是最简单的谱估计方法:
def periodogram(x, fs=1.0):
"""
周期图功率谱估计
Args:
x: 时间序列
fs: 采样频率
Returns:
freq: 频率数组
power: 功率谱密度
"""
N = len(x)
X = np.fft.fft(x)
power = np.abs(X)**2 / N
freq = np.fft.fftfreq(N, d=1/fs)
# 只返回正频率
pos_mask = freq >= 0
return freq[pos_mask], power[pos_mask]
# Welch 方法:分段的周期图平均
def welch_psd(x, fs=1.0, nperseg=256, noverlap=None):
"""
Welch 方法:分段平均周期图
通过重叠分段减少方差
"""
if noverlap is None:
noverlap = nperseg // 2
# 分段
n_segments = (len(x) - nperseg) // (nperseg - noverlap) + 1
psd_sum = np.zeros(nperseg)
for i in range(n_segments):
start = i * (nperseg - noverlap)
segment = x[start:start + nperseg]
# 加窗(汉宁窗减少频谱泄漏)
window = np.hanning(nperseg)
segment_windowed = segment * window
# 周期图
X = np.fft.fft(segment_windowed)
psd_sum += np.abs(X)**2
# 平均
psd = psd_sum / n_segments
freq = np.fft.fftfreq(nperseg, d=1/fs)
# 归一化
psd = psd / (np.sum(window**2) * fs)
pos_mask = freq >= 0
return freq[pos_mask], psd[pos_mask]2.2 参数化谱估计
当信号可以用参数模型描述时,参数化方法更有效。
AR 模型谱估计:
def ar_psd(y, order, fs=1.0):
"""
使用 AR 模型估计功率谱密度
使用 Yule-Walker 方法估计 AR 系数
"""
from scipy.linalg import toeplitz, solve_toeplitz
# 计算自相关
n = len(y)
r = np.correlate(y, y, mode='full') / n
r = r[n-1:] # 取正延迟部分
# 构建 Toeplitz 矩阵并求解 Yule-Walker 方程
R = toeplitz(r[:order])
r_vec = r[1:order+1]
# AR 系数 (使用 Levinson-Durbin 递归更高效)
from scipy.linalg import solve
a = solve(R, -r_vec)
sigma2 = r[0] + np.dot(r_vec, a)
# 计算频率响应
freq = np.linspace(0, fs/2, 1000)
w = 2 * np.pi * freq / fs
# AR 系统的频率响应
denom = 1 - np.sum([a[i] * np.exp(-1j * (i+1) * w) for i in range(order)], axis=0)
psd = sigma2 / (np.abs(denom)**2)
return freq, psd2.3 谱估计方法对比
# 对比不同谱估计方法
from scipy import signal as sig
t = np.linspace(0, 1, 512)
x = (np.sin(2*np.pi*10*t) + 0.5*np.sin(2*np.pi*30*t) +
0.2*np.random.randn(len(t)))
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# 1. 周期图
freq1, psd1 = periodogram(x)
axes[0, 0].plot(freq1, 10*np.log10(psd1))
axes[0, 0].set_title('Periodogram')
axes[0, 0].set_xlabel('Frequency (Hz)')
axes[0, 0].set_ylabel('Power (dB)')
axes[0, 0].grid(True)
# 2. Welch 方法
freq2, psd2 = welch_psd(x, nperseg=128)
axes[0, 1].plot(freq2, 10*np.log10(psd2))
axes[0, 1].set_title('Welch Method')
axes[0, 1].set_xlabel('Frequency (Hz)')
axes[0, 1].set_ylabel('Power (dB)')
axes[0, 1].grid(True)
# 3. AR 模型
freq3, psd3 = ar_psd(x, order=20)
axes[1, 0].plot(freq3, 10*np.log10(psd3))
axes[1, 0].set_title('AR Model (order=20)')
axes[1, 0].set_xlabel('Frequency (Hz)')
axes[1, 0].set_ylabel('Power (dB)')
axes[1, 0].grid(True)
# 4. SciPy 信号库
freq4, psd4 = sig.welch(x, nperseg=128)
axes[1, 1].plot(freq4, 10*np.log10(psd4))
axes[1, 1].set_title('scipy.signal.welch')
axes[1, 1].set_xlabel('Frequency (Hz)')
axes[1, 1].set_ylabel('Power (dB)')
axes[1, 1].grid(True)
plt.tight_layout()
plt.show()三、周期检测
3.1 自相关函数(ACF)
自相关函数是检测周期性的经典方法:
def autocorrelation(x, max_lag=None):
"""计算自相关函数"""
n = len(x)
if max_lag is None:
max_lag = n - 1
x_centered = x - np.mean(x)
acf = np.correlate(x_centered, x_centered, mode='full')
acf = acf[n-1:n+max_lag]
acf = acf / acf[0] # 归一化
return np.arange(max_lag + 1), acf
def plot_acf_with_periods(x, max_lag=None):
"""绘制自相关图并标注显著周期"""
lags, acf = autocorrelation(x, max_lag)
plt.figure(figsize=(12, 6))
plt.stem(lags, acf, use_line_collection=True)
# 置信区间(95%)
n = len(x)
confidence = 1.96 / np.sqrt(n)
plt.axhline(y=confidence, color='r', linestyle='--', label='95% CI')
plt.axhline(y=-confidence, color='r', linestyle='--')
# 找峰值(排除 lag=0)
from scipy.signal import find_peaks
peaks, properties = find_peaks(acf[1:], height=confidence, distance=5)
for peak in peaks:
period = peak + 1 # lag 从 1 开始
plt.annotate(f'P={period}', xy=(peak+1, acf[peak+1]),
xytext=(peak+10, acf[peak+1]+0.1),
arrowprops=dict(arrowstyle='->', color='green'))
plt.xlabel('Lag')
plt.ylabel('Autocorrelation')
plt.title('Autocorrelation Function')
plt.grid(True)
plt.legend()
plt.show()
return peaks + 1 # 返回检测到的周期3.2 傅里叶谱峰检测
def detect_periods_via_fft(x, fs=1.0, top_n=5, min_period=2):
"""
通过 FFT 检测主要周期
Returns:
periods: 检测到的周期列表
powers: 对应的功率
"""
N = len(x)
# 去均值(移除直流分量)
x_centered = x - np.mean(x)
# FFT
X = np.fft.fft(x_centered)
power = np.abs(X)**2 / N
# 只考虑正频率
freq = np.fft.fftfreq(N, d=1/fs)
pos_mask = (freq > 0) & (freq <= fs/2)
freq_pos = freq[pos_mask]
power_pos = power[pos_mask]
# 转换为周期
periods = 1 / freq_pos
# 只考虑合理范围内的周期
valid_mask = (periods >= min_period) & (periods <= N/2)
# 找峰值
from scipy.signal import find_peaks
peaks, props = find_peaks(power_pos[valid_mask], height=np.percentile(power_pos[valid_mask], 90))
# 按功率排序,取前 top_n
peak_powers = power_pos[valid_mask][peaks]
sorted_idx = np.argsort(peak_powers)[::-1][:top_n]
detected_periods = periods[valid_mask][peaks[sorted_idx]]
detected_powers = peak_powers[sorted_idx]
return detected_periods, detected_powers
# 示例
periods, powers = detect_periods_via_fft(signal)
print("检测到的周期:")
for p, pw in zip(periods, powers):
print(f" 周期 = {p:.2f} (功率 = {pw:.4f})")3.3 谱分析法
def spectral_analysis_periodogram(x, fs=1.0):
"""
使用周期图进行谱分析
返回显著频率和周期
"""
freq, psd = periodogram(x, fs)
# 找峰值
from scipy.signal import find_peaks
peaks, _ = find_peaks(psd, height=np.mean(psd) + 2*np.std(psd))
results = []
for peak in peaks:
results.append({
'frequency': freq[peak],
'period': 1/freq[peak] if freq[peak] > 0 else np.inf,
'power': psd[peak]
})
return sorted(results, key=lambda x: x['power'], reverse=True)
# 综合周期检测
def comprehensive_period_detection(x, fs=1.0):
"""
综合多种方法的周期检测
"""
print("=" * 50)
print("综合周期检测结果")
print("=" * 50)
# 1. ACF
print("\n[1] 自相关方法:")
acf_periods = plot_acf_with_periods(x)
print(f" 检测到的周期: {acf_periods}")
# 2. FFT
print("\n[2] FFT 谱峰检测:")
fft_periods, fft_powers = detect_periods_via_fft(x, fs)
for p, pw in zip(fft_periods, fft_powers):
print(f" 周期 = {p:.2f}")
# 3. 谱分析
print("\n[3] 谱分析:")
spectral_results = spectral_analysis_periodogram(x, fs)
for r in spectral_results[:5]:
print(f" 周期 = {r['period']:.2f}, 频率 = {r['frequency']:.4f} Hz")
return {
'acf_periods': acf_periods,
'fft_periods': fft_periods,
'spectral_periods': [r['period'] for r in spectral_results]
}四、频域特征提取
4.1 常用频域特征
def extract_frequency_features(x, fs=1.0):
"""
提取频域特征
"""
freq, psd = periodogram(x, fs)
cumsum_psd = np.cumsum(psd)
total_power = cumsum_psd[-1]
features = {}
# 1. 谱重心 (Spectral Centroid)
features['spectral_centroid'] = np.sum(freq * psd) / np.sum(psd)
# 2. 谱熵 (Spectral Entropy)
psd_norm = psd / (np.sum(psd) + 1e-10)
features['spectral_entropy'] = -np.sum(psd_norm * np.log2(psd_norm + 1e-10))
# 3. 谱平坦度 (Spectral Flatness) - 衡量噪声 vs 谐波
geometric_mean = np.exp(np.mean(np.log(psd + 1e-10)))
arithmetic_mean = np.mean(psd)
features['spectral_flatness'] = geometric_mean / (arithmetic_mean + 1e-10)
# 4. 主频率
features['dominant_frequency'] = freq[np.argmax(psd)]
features['dominant_period'] = 1 / (features['dominant_frequency'] + 1e-10)
# 5. 频段能量
n = len(freq)
low_band = slice(0, n//4)
mid_band = slice(n//4, n//2)
high_band = slice(n//2, n)
features['low_band_power'] = np.sum(psd[low_band]) / total_power
features['mid_band_power'] = np.sum(psd[mid_band]) / total_power
features['high_band_power'] = np.sum(psd[high_band]) / total_power
# 6. 频谱质心时间变化(时变信号的频域特征)
# ...
return features
# 计算并显示特征
features = extract_frequency_features(signal)
print("频域特征:")
for name, value in features.items():
print(f" {name}: {value:.4f}")4.2 滚动频域特征
def rolling_frequency_features(x, window_size=50, hop_size=1, fs=1.0):
"""
计算滚动频域特征
适用于非平稳信号
"""
n = len(x)
n_windows = (n - window_size) // hop_size + 1
all_features = []
time_indices = []
for i in range(n_windows):
start = i * hop_size
end = start + window_size
window = x[start:end]
feats = extract_frequency_features(window, fs)
all_features.append(feats)
time_indices.append(start + window_size // 2)
return time_indices, all_features
# 可视化滚动特征
time_idx, feats_list = rolling_frequency_features(signal, window_size=100, hop_size=10)
# 转为 DataFrame 便于分析
import pandas as pd
df_feats = pd.DataFrame(feats_list, index=time_idx)
# 绘图
fig, axes = plt.subplots(3, 1, figsize=(14, 10))
axes[0].plot(signal)
axes[0].set_title('Original Signal')
for idx in time_idx:
axes[0].axvline(idx, color='gray', alpha=0.1)
axes[1].plot(time_idx, df_feats['dominant_frequency'])
axes[1].set_title('Dominant Frequency over Time')
axes[1].set_ylabel('Frequency (Hz)')
axes[2].plot(time_idx, df_feats['spectral_centroid'])
axes[2].set_title('Spectral Centroid over Time')
axes[2].set_ylabel('Frequency (Hz)')
plt.tight_layout()
plt.show()五、频域与深度学习的结合
5.1 频域输入特征
class FrequencyDomainFeatures(nn.Module):
"""
频域特征提取模块
"""
def __init__(self, n_fft=256, hop_length=128, n_mels=64):
super().__init__()
self.n_fft = n_fft
self.hop_length = hop_length
# 可以使用 STFT 获取时频表示
self.register_buffer('window', torch.hann_window(n_fft))
def forward(self, x):
"""
Args:
x: [batch, seq_len] 时间序列
Returns:
features: [batch, n_features]
"""
# 短时傅里叶变换
stft = torch.stft(
x,
n_fft=self.n_fft,
hop_length=self.hop_length,
window=self.window,
return_complex=True
)
# 幅度谱
mag = torch.abs(stft) # [batch, freq, time]
# 功率谱
power = mag ** 2
# 全局平均
mean_power = power.mean(dim=-1) # [batch, freq]
# 统计特征
features = []
# 1. 对数功率均值
log_power = torch.log(power + 1e-10)
features.append(log_power.mean(dim=-1))
# 2. 谱质心
freq_bins = torch.arange(mag.shape[1], device=x.device).float()
spectral_centroid = (mag * freq_bins.unsqueeze(0).unsqueeze(-1)).sum(dim=1) / (mag.sum(dim=1) + 1e-10)
features.append(spectral_centroid.mean(dim=-1))
# 3. 谱熵
p_norm = power / (power.sum(dim=1, keepdim=True) + 1e-10)
spectral_entropy = -(p_norm * torch.log(p_norm + 1e-10)).sum(dim=1)
features.append(spectral_entropy.mean(dim=-1))
return torch.cat(features, dim=-1)5.2 FEDformer 架构
FEDformer(Fourier Enhanced Decomposition Transformer)是结合频域分析的时间序列预测模型:
class FourierBasisDecomposition(nn.Module):
"""
傅里叶基分解
将时间序列分解为多个频率成分
"""
def __init__(self, n_components=5):
super().__init__()
self.n_components = n_components
def forward(self, x):
"""
Args:
x: [batch, seq_len]
Returns:
low_freq: 低频成分
high_freq: 高频成分
"""
batch_size, seq_len = x.shape
# FFT
X = torch.fft.fft(x, dim=1)
# 按频率排序并分离
magnitude = torch.abs(X)
# 取最高频率成分(高频噪声)
k = self.n_components
high_freq_indices = torch.topk(magnitude[:, k:-k], k=k, dim=1).indices
# 低频部分
X_low = X.clone()
for b in range(batch_size):
for idx in high_freq_indices[b]:
X_low[b, idx] = 0
X_low[b, seq_len - idx] = 0
high_freq_indices_set = set()
for b in range(batch_size):
for idx in high_freq_indices[b]:
high_freq_indices_set.add((b, idx.item()))
high_freq_indices_set.add((b, seq_len - idx.item()))
X_high = torch.zeros_like(X)
for b, idx in high_freq_indices_set:
X_high[b, idx] = X[b, idx]
# 逆变换
x_low = torch.fft.ifft(X_low, dim=1).real
x_high = torch.fft.ifft(X_high, dim=1).real
return x_low, x_high
class FEDformerEncoder(nn.Module):
"""
FEDformer 风格的编码器
"""
def __init__(self, d_model=128, n_heads=4, n_components=5):
super().__init__()
self.fourier_decomp = FourierBasisDecomposition(n_components)
# 低频分支
self.low_encoder_layers = nn.ModuleList([
nn.TransformerEncoderLayer(
d_model=d_model,
nhead=n_heads,
dim_feedforward=d_model * 4,
batch_first=True
) for _ in range(2)
])
# 高频分支
self.high_encoder_layers = nn.ModuleList([
nn.TransformerEncoderLayer(
d_model=d_model,
nhead=n_heads,
dim_feedforward=d_model * 4,
batch_first=True
) for _ in range(2)
])
# 投影层
self.projection = nn.Linear(d_model * 2, d_model)
def forward(self, x):
"""
x: [batch, seq_len, d_model]
"""
# 分离频率成分
x_low_freq, x_high_freq = self.fourier_decomp(x)
# 分别编码
for layer in self.low_encoder_layers:
x_low_freq = layer(x_low_freq)
for layer in self.high_encoder_layers:
x_high_freq = layer(x_high_freq)
# 拼接
x_combined = torch.cat([x_low_freq, x_high_freq], dim=-1)
# 投影回原始维度
x_out = self.projection(x_combined)
return x_out5.3 Neural Fourier Transform
Neural Fourier Transform 将傅里叶分析作为神经网络的一层:
class NeuralFourierTransform(nn.Module):
"""
可学习的傅里叶变换层
学习最适合任务的频率表示
"""
def __init__(self, input_dim, n_frequencies=64):
super().__init__()
self.n_frequencies = n_frequencies
# 可学习的频率基函数参数
self.freq_params = nn.Parameter(
torch.randn(n_frequencies, input_dim) * 0.1
)
# 频率振幅
self.amplitude_params = nn.Parameter(torch.ones(n_frequencies))
# 相位偏移
self.phase_params = nn.Parameter(torch.zeros(n_frequencies))
def forward(self, x):
"""
Args:
x: [batch, seq_len, input_dim]
Returns:
freq_features: [batch, n_frequencies]
"""
batch_size, seq_len, _ = x.shape
# 计算每个频率与输入的相关性
# x_flat: [batch * seq_len, input_dim]
x_flat = x.reshape(-1, x.shape[-1])
# 计算频率响应
# cosine_similarity: [batch * seq_len, n_frequencies]
cos_sim = torch.matmul(x_flat, self.freq_params.t())
# 应用振幅和相位
freq_response = self.amplitude_params.unsqueeze(0) * torch.cos(
cos_sim + self.phase_params.unsqueeze(0)
)
# 重新整形并池化
freq_response = freq_response.reshape(batch_size, seq_len, -1)
freq_features = freq_response.mean(dim=1) # 时间池化
return freq_features
class NFTTBlock(nn.Module):
"""
Neural Fourier Transform 块
结合频域特征和时域卷积
"""
def __init__(self, d_model=128, n_frequencies=64, kernel_size=3):
super().__init__()
self.fourier_transform = NeuralFourierTransform(d_model, n_frequencies)
self.freq_projection = nn.Linear(n_frequencies, d_model)
self.temporal_conv = nn.Conv1d(
d_model, d_model, kernel_size,
padding=kernel_size//2
)
self.norm = nn.LayerNorm(d_model)
def forward(self, x):
"""
x: [batch, seq_len, d_model]
"""
# 频域分支
freq_features = self.fourier_transform(x)
freq_out = self.freq_projection(freq_features)
# 时域分支
x_conv = x.transpose(1, 2) # [batch, d_model, seq_len]
temporal_out = self.temporal_conv(x_conv).transpose(1, 2) # [batch, seq_len, d_model]
# 融合
out = freq_out.unsqueeze(1) + temporal_out
out = self.norm(out)
return out六、实践指南
6.1 参数选择
| 参数 | 选择建议 |
|---|---|
| FFT 大小 (N) | 2的幂次,通常 256, 512, 1024, 2048 |
| 窗口类型 | 汉宁窗减少频谱泄漏,矩形窗分辨率最高 |
| 重叠比例 | Welch 方法建议 50-75% 重叠 |
| AR 模型阶数 | AIC/BIC 准则选择,或使用 到 |
6.2 频率分辨率
其中 是采样频率, 是 FFT 大小。
def frequency_resolution(sampling_rate, n_fft):
"""计算频率分辨率"""
return sampling_rate / n_fft
# 示例
print(f"256 点 FFT, 采样率 1000 Hz:")
print(f" 分辨率 = {frequency_resolution(1000, 256):.2f} Hz")
print(f"1024 点 FFT, 采样率 1000 Hz:")
print(f" 分辨率 = {frequency_resolution(1000, 1024):.2f} Hz")七、参考
相关阅读
Footnotes
-
Proakis, J. G., & Manolakis, D. G. (2007). Digital Signal Processing: Principles, Algorithms, and Applications. Prentice Hall. ↩