Baum-Welch与前向后向算法

概述

前向后向算法(Forward-Backward Algorithm)和Baum-Welch算法是隐马尔可夫模型(HMM)的核心推断和学习算法12。它们在语音识别、自然语言处理、生物信息学等领域有广泛应用。

核心任务:从观测序列推断隐状态序列(解码),并学习模型参数(学习)。


1. 隐马尔可夫模型回顾

1.1 HMM定义

HMM由以下五元组定义:

符号含义维度
隐状态数量标量
观测符号数量标量
状态转移矩阵
发射矩阵
初始状态分布

1.2 三个基本问题

问题描述解法
评估给定观测序列 ,计算 前向/后向算法
解码找到最可能的隐状态序列Viterbi算法
学习调整参数 最大化 Baum-Welch算法

2. 前向算法

2.1 问题定义

目标:计算给定模型 下,观测序列 的概率:

直接计算复杂度为 ,前向算法将复杂度降低到

2.2 前向变量定义

定义前向变量 为时刻 处于状态 ,且观测到 的联合概率:

2.3 递归推导

初始化):

递归):

几何解释:

时刻 t-1:  α_{t-1}(1) ──► 状态1 ──► α_t(j)
           α_{t-1}(2) ──► 状态2 ──►
           ...           ...
           α_{t-1}(N) ──► 状态N ──►
                               │
                               ▼
                           发射 b_j(o_t)
                               │
                               ▼
                          α_t(j)

2.4 终止

2.5 数值稳定性:Log域实现

较大时, 会趋近于0,导致数值下溢。解决方案是使用对数域计算:

import numpy as np
 
class HMMForward:
    """前向算法的对数域实现"""
    
    def __init__(self, A, B, pi):
        """
        参数:
            A: 状态转移矩阵 [N, N]
            B: 发射矩阵 [N, M]
            pi: 初始分布 [N]
        """
        self.A = A
        self.B = B
        self.pi = pi
        self.N = len(pi)
    
    def forward_log(self, observations):
        """
        对数域前向算法
        
        返回:
            log_prob: log P(O|lambda)
            alphas_log: 对数前向变量 [T, N]
        """
        T = len(observations)
        N = self.N
        
        alphas_log = np.zeros((T, N))
        
        # 初始化
        alphas_log[0] = np.log(self.pi) + np.log(self.B[:, observations[0]])
        
        # 递归
        for t in range(1, T):
            for j in range(N):
                # log-sum-exp技巧
                log_trans = np.log(self.A[:, j])
                log_alpha_sum = alphas_log[t-1] + log_trans
                alphas_log[t, j] = np.max(log_alpha_sum) + \
                    np.log(np.sum(np.exp(log_alpha_sum - np.max(log_alpha_sum)))) + \
                    np.log(self.B[j, observations[t]])
        
        # 终止
        log_prob = np.max(alphas_log[-1]) + \
            np.log(np.sum(np.exp(alphas_log[-1] - np.max(alphas_log[-1]))))
        
        return log_prob, alphas_log
    
    def forward_scaled(self, observations):
        """
        缩放前向算法
        
        返回:
            log_prob: log P(O|lambda)
            alphas: 缩放后的前向变量 [T, N]
            scales: 缩放因子 [T]
        """
        T = len(observations)
        N = self.N
        
        alphas = np.zeros((T, N))
        scales = np.zeros(T)
        
        # 初始化
        alphas[0] = self.pi * self.B[:, observations[0]]
        scales[0] = np.sum(alphas[0])
        alphas[0] = alphas[0] / scales[0]
        
        # 递归
        for t in range(1, T):
            alphas[t] = np.dot(alphas[t-1], self.A) * self.B[:, observations[t]]
            scales[t] = np.sum(alphas[t])
            alphas[t] = alphas[t] / scales[t]
        
        # log概率
        log_prob = np.sum(np.log(scales))
        
        return log_prob, alphas, scales

3. 后向算法

3.1 问题定义

目标:计算给定模型 下,从时刻 的观测在时刻 处于状态 的条件概率:

3.2 后向变量定义

3.3 递归推导

初始化):

递归):

3.4 Python实现

class HMMBackward:
    """后向算法的缩放实现"""
    
    def __init__(self, A, B):
        self.A = A
        self.B = B
        self.N = A.shape[0]
    
    def backward_scaled(self, observations, scales):
        """
        缩放后向算法
        
        参数:
            observations: 观测序列
            scales: 前向算法的缩放因子
        返回:
            betas: 缩放后的后向变量 [T, N]
        """
        T = len(observations)
        N = self.N
        
        betas = np.zeros((T, N))
        
        # 初始化
        betas[-1] = np.ones(N)
        
        # 递归(逆时间方向)
        for t in range(T-2, -1, -1):
            betas[t] = np.dot(self.A, self.B[:, observations[t+1]] * betas[t+1])
            betas[t] = betas[t] / scales[t+1]
        
        return betas

4. 前向后向算法:计算期望

4.1 单点概率

时刻 处于状态 的后验概率:

使用缩放版本:

4.2 两点概率

时刻 处于状态 ,时刻 处于状态 的联合后验概率:

使用缩放版本:

4.3 期望的直观解释

期望物理意义应用
处于状态 的期望次数更新
从状态 转移到 的期望次数更新
在状态 观测到 的期望次数更新

5. Baum-Welch算法:EM学习

5.1 EM框架

Baum-Welch算法是EM算法在HMM中的特例3

  • E步:计算隐变量的后验期望(使用当前参数)
  • M步:最大化完整数据对数似然的期望

5.2 参数重估计公式

初始分布

状态转移矩阵

发射矩阵

5.3 完整实现

class BaumWelch:
    """Baum-Welch算法实现"""
    
    def __init__(self, n_states, n_obs):
        self.n_states = n_states
        self.n_obs = n_obs
        self._init_parameters()
    
    def _init_parameters(self):
        """随机初始化参数"""
        self.A = np.random.rand(self.n_states, self.n_states)
        self.A = self.A / self.A.sum(axis=1, keepdims=True)
        
        self.B = np.random.rand(self.n_states, self.n_obs)
        self.B = self.B / self.B.sum(axis=1, keepdims=True)
        
        self.pi = np.random.rand(self.n_states)
        self.pi = self.pi / self.pi.sum()
    
    def _forward_scaled(self, observations):
        """缩放前向算法"""
        T = len(observations)
        alphas = np.zeros((T, self.n_states))
        scales = np.zeros(T)
        
        alphas[0] = self.pi * self.B[:, observations[0]]
        scales[0] = np.sum(alphas[0])
        alphas[0] = alphas[0] / scales[0]
        
        for t in range(1, T):
            alphas[t] = np.dot(alphas[t-1], self.A) * self.B[:, observations[t]]
            scales[t] = np.sum(alphas[t])
            alphas[t] = alphas[t] / scales[t]
        
        return alphas, scales
    
    def _backward_scaled(self, observations, scales):
        """缩放后向算法"""
        T = len(observations)
        betas = np.zeros((T, self.n_states))
        
        betas[-1] = np.ones(self.n_states)
        
        for t in range(T-2, -1, -1):
            betas[t] = np.dot(self.A, self.B[:, observations[t+1]] * betas[t+1])
            betas[t] = betas[t] / scales[t+1]
        
        return betas
    
    def _compute_gamma(self, alphas, betas):
        """计算gamma"""
        return alphas * betas / (alphas * betas).sum(axis=1, keepdims=True)
    
    def _compute_xi(self, observations, alphas, betas, scales):
        """计算xi"""
        T = len(observations)
        xi = np.zeros((T-1, self.n_states, self.n_states))
        
        for t in range(T-1):
            for i in range(self.n_states):
                for j in range(self.n_states):
                    xi[t, i, j] = alphas[t, i] * self.A[i, j] * \
                                   self.B[j, observations[t+1]] * betas[t+1, j]
            xi[t] = xi[t] / xi[t].sum()
        
        return xi
    
    def train(self, observations, max_iter=100, tol=1e-6, verbose=True):
        """
        Baum-Welch训练
        
        参数:
            observations: 观测序列列表 [[o1, o2, ...], ...]
            max_iter: 最大迭代次数
            tol: 收敛阈值
        返回:
            log_likelihoods: 每次迭代的对数似然
        """
        log_likelihoods = []
        
        for iteration in range(max_iter):
            # E步:计算期望
            total_gamma = np.zeros(self.n_states)
            total_xi = np.zeros((self.n_states, self.n_states))
            total_B = np.zeros((self.n_states, self.n_obs))
            
            for obs_seq in observations:
                alphas, scales = self._forward_scaled(obs_seq)
                betas = self._backward_scaled(obs_seq, scales)
                
                gamma = self._compute_gamma(alphas, betas)
                xi = self._compute_xi(obs_seq, alphas, betas, scales)
                
                total_gamma += gamma.sum(axis=0)
                total_xi += xi.sum(axis=0)
                for t, obs in enumerate(obs_seq):
                    total_B[:, obs] += gamma[t]
            
            # M步:更新参数
            self.pi = total_gamma[0] / total_gamma.sum() if total_gamma[0] > 0 else self.pi
            self.A = total_xi / total_gamma.sum(axis=1, keepdims=True)
            self.A = np.clip(self.A, 1e-10, None)  # 避免零概率
            self.A = self.A / self.A.sum(axis=1, keepdims=True)
            
            self.B = total_B / total_gamma.sum(axis=0, keepdims=True).T
            self.B = np.clip(self.B, 1e-10, None)
            self.B = self.B / self.B.sum(axis=1, keepdims=True)
            
            # 计算对数似然
            log_prob = 0
            for obs_seq in observations:
                _, scales = self._forward_scaled(obs_seq)
                log_prob += np.sum(np.log(scales + 1e-10))
            log_likelihoods.append(log_prob)
            
            if verbose and (iteration + 1) % 10 == 0:
                print(f"Iter {iteration+1}: Log-likelihood = {log_prob:.4f}")
            
            # 收敛检查
            if len(log_likelihoods) > 1:
                if abs(log_likelihoods[-1] - log_likelihoods[-2]) < tol:
                    if verbose:
                        print(f"Converged at iteration {iteration+1}")
                    break
        
        return log_likelihoods

6. Viterbi算法:最优路径解码

6.1 问题定义

目标:找到最可能产生观测序列 的隐状态序列

6.2 算法推导

定义Viterbi变量 为时刻 到达状态 的最大概率:

递归

同时记录回溯指针:

6.3 实现

def viterbi(A, B, pi, observations):
    """
    Viterbi算法:最优路径解码
    
    参数:
        A: 转移矩阵 [N, N]
        B: 发射矩阵 [N, M]
        pi: 初始分布 [N]
        observations: 观测序列 [T]
    
    返回:
        path: 最优状态序列 [T]
        log_prob: 最优路径的log概率
    """
    N = len(pi)
    T = len(observations)
    
    delta = np.zeros((T, N))
    psi = np.zeros((T, N), dtype=int)
    
    # 初始化
    delta[0] = pi * B[:, observations[0]]
    delta[0] = delta[0] / delta[0].sum()  # 归一化
    log_delta = np.log(delta[0] + 1e-10)
    
    # 递归
    for t in range(1, T):
        for j in range(N):
            trans_log = log_delta + np.log(A[:, j] + 1e-10)
            psi[t, j] = np.argmax(trans_log)
            log_delta[j] = trans_log[psi[t, j]] + np.log(B[j, observations[t]] + 1e-10)
    
    # 回溯
    path = np.zeros(T, dtype=int)
    path[-1] = np.argmax(log_delta)
    for t in range(T-2, -1, -1):
        path[t] = psi[t+1, path[t+1]]
    
    log_prob = np.max(log_delta)
    
    return path, log_prob

7. 完整示例:天气预测

def demo_weather_hmm():
    """天气预测HMM演示"""
    
    # 定义HMM
    # 状态: 0=晴天, 1=阴天, 2=雨天
    # 观测: 0=dry, 1=damp, 2=soggy
    
    A = np.array([
        [0.7, 0.2, 0.1],  # 晴天转移
        [0.3, 0.4, 0.3],  # 阴天转移
        [0.2, 0.3, 0.5],  # 雨天转移
    ])
    
    B = np.array([
        [0.8, 0.15, 0.05],  # 晴天的观测
        [0.3, 0.4, 0.3],    # 阴天的观测
        [0.1, 0.3, 0.6],   # 雨天的观测
    ])
    
    pi = np.array([0.6, 0.3, 0.1])  # 初始分布
    
    # 生成观测序列
    np.random.seed(42)
    true_states = []
    observations = []
    
    state = np.random.choice(3, p=pi)
    for _ in range(20):
        true_states.append(state)
        obs = np.random.choice(3, p=B[state])
        observations.append(obs)
        state = np.random.choice(3, p=A[state])
    
    # 前向算法:计算概率
    hmm = HMMForward(A, B, pi)
    log_prob, alphas = hmm.forward_log(observations)
    print(f"观测序列概率 (log): {log_prob:.4f}")
    
    # Viterbi解码:找到最可能的状态序列
    path, path_prob = viterbi(A, B, pi, observations)
    print(f"\n真实状态: {true_states}")
    print(f"解码状态: {path.tolist()}")
    print(f"解码概率 (log): {path_prob:.4f}")
    
    # Baum-Welch训练
    print("\n--- Baum-Welch训练 ---")
    bw = BaumWelch(3, 3)
    bw.A = A.copy()
    bw.B = B.copy()
    bw.pi = pi.copy()
    
    # 添加一些噪声到初始参数
    bw.A += np.random.randn(3, 3) * 0.1
    bw.A = bw.A / bw.A.sum(axis=1, keepdims=True)
    bw.B += np.random.randn(3, 3) * 0.1
    bw.B = bw.B / bw.B.sum(axis=1, keepdims=True)
    
    print("训练前转移矩阵:\n", np.round(bw.A, 3))
    print("训练前发射矩阵:\n", np.round(bw.B, 3))
    
    # 训练
    log_liks = bw.train([observations], max_iter=100, verbose=True)
    
    print("\n训练后转移矩阵:\n", np.round(bw.A, 3))
    print("训练后发射矩阵:\n", np.round(bw.B, 3))
    print("训练后初始分布:\n", np.round(bw.pi, 3))
 
# 运行演示
demo_weather_hmm()

8. 实践注意事项

8.1 数值稳定性

问题解决方案
下溢使用对数域或缩放
零概率添加拉普拉斯平滑
收敛慢良好的参数初始化

8.2 初始化策略

  • 均匀初始化
  • 数据驱动:使用K-means初始化发射参数
  • 多次运行:使用不同初始化选择最优

8.3 多序列训练

当有多个观测序列时,将所有序列的期望加和:


9. 总结

算法对比

算法用途时间复杂度
前向算法计算
后向算法计算条件概率
前向后向计算后验期望
Viterbi解码最优路径
Baum-Welch参数学习

扩展方向

  • 高斯HMM:连续观测用高斯混合模型
  • 深度HMM:神经网络发射模型
  • 因子HMM:层次结构
  • 条件HMM:上下文相关

参考资料


相关主题

Footnotes

  1. Rabiner, L. (1989). “A Tutorial on Hidden Markov Models and Selected Applications in Speech Recognition.” Proceedings of the IEEE, 77(2), 257-286.

  2. Baum, L. E., & Petrie, T. (1966). “Statistical Inference for Probabilistic Functions of Finite State Markov Chains.” The Annals of Mathematical Statistics, 37(6), 1554-1563.

  3. Dempster, A. P., Laird, N. M., & Rubin, D. B. (1977). “Maximum Likelihood from Incomplete Data via the EM Algorithm.” Journal of the Royal Statistical Society.