EM算法

EM算法的引入

概率模型有时既含观测变量,又含隐变量。如果概率模型的变量都是观测变量,那么给定数据,可以直接用极大似然估计法,或贝叶斯估计法估计模型参数。但是,当模型含有隐变量时,就不能简单地使用这些估计方法。EM算法就是含有隐变量的概率模型参数的极大似然估计法,或极大后验概率估计法。我们仅讨论极大似然估计,极大后验概率估计与其类似。

三硬币模型

假设有3枚硬币,分别记作$A, B, C$。这些硬币正面出现的概率分别是$\pi, p, q$。进行如下投掷试验:先掷硬币$A$,根据其结果选出硬币$B$或硬币$C$,正面选硬币$B$,反而选硬币$C$;然后掷选出的硬币,掷硬币的结果,出现正面记作1,出现反面记作0;独立地重复$n$次试验(这里$n=10$),预测结果如下:1, 1, 0, 1, 0, 0, 1, 0, 1, 1。

假设只能观测到掷硬币的结果,不能观测掷硬币的过程。问如何估计三枚硬币分别出现正面的概率,即三硬币模型的参数。

$$ \begin{aligned} P(y|\theta) &=\sum_{z} P(y, z|\theta)=\sum_{z} P(z|\theta) P(y|z, \theta) \\ &=\pi p^{y}(1-p)^{1-y}+(1-\pi) q^{y}(1-q)^{1-y} \end{aligned} $$

其中,随机变量$y$是观测变量,表示一次试验观测的结果是0或1;随机变量$z$为隐变量,表示未观测到的硬币$A$的结果;$\theta=(\pi,p,q)$是模型参数。这一模型是以上数据的生成模型。注意,随机变量$y$的数据可以观测,随机变量$z$的数据不可观测

EM算法过程

$$ P(Y|\theta)=\sum_Z P(Z|\theta)P(Y|Z,\theta) $$$$ P(Y|\theta)=\prod_{j=1}^{n}[\pi p^{y_j}(1-p)^{1-y_j}+(1-\pi)q^{y_j}(1-q)^{1-y_j}] $$$$ \hat \theta=\arg\max_\theta \log P(Y|\theta) $$

这个问题没有解析解,只有通过迭代的方法求解。EM算法就是可以用于求解这个问题的一种迭代算法,下面给出针对三硬币问题的EM算法:

EM算法首先选取参数的初值,记作$\theta^{(0)}=(\pi^{(0)},p^{(0)},q^{(0)})$,然后通过下面的步骤迭代计算参数的估计值,直至收敛为止。第$i$次迭代参数的估计值为$\theta^{(i)}=(\pi^{(i)},p^{(i)},q^{(i)})$。EM算法第$i+1$次迭代如下:

$$ \mu^{(i+1)}=\frac{\pi^{(i)}(p^{(i)})^{y_{j}}(1-p^{(i)})^{1-y_{j}}}{\pi^{(i)}(p^{(i)})^{y_{j}}(1-p^{(i)})^{1-y_{j}}+(1-\pi^{(i)})(q^{(i)})^{y_{j}}(1-q^{(i)})^{1-y_{j}}} $$$$ \begin{aligned} \pi^{(i+1)}=\frac{1}{n} \sum_{j=1}^{n} \mu_{j}^{(i+1)} \\ p^{(i+1)}=\frac{\sum_{j=1}^{n} \mu_{j}^{(i+1)} y_{j}}{\sum_{j=1}^{n} \mu_{j}^{(i+1)}} \\ q^{(i+1)}=\frac{\sum_{j=1}^{n}(1-\mu_{j}^{(i+1)}) y_{j}}{\sum_{j=1}^{n}(1-\mu_{j}^{(i+1)})} \end{aligned} $$

可以算得,如果取初值$\pi^{(0)}=0.4,p^{(0)}=0.6,q^{(0)}=0.7$,那么得到的模型参数的极大似然估计中,三者的值分别为0.4064, 0.5368和0.6432。EM算法与初值的选择有关,选择不同的初值可能得到不同的参数估计值。

一般用$Y$表示观测随机变量的数据,$Z$表示隐随机变量的数据。$Y$和$Z$连在一起称为完全数据(complete data),观测数据$Y$又称为不完全数据(incomplete data)。假设给定观测数据$Y$,其概率分布是$P(Y|\theta)$,其中$\theta$是需要估计的模型参数,那么不完全数据$Y$的似然函数是$P(Y|\theta)$,对数似然函数$L(\theta)=\log P(Y|\theta)$;假设$Y$和$Z$的联合概率分布是$P(Y,Z|\theta)$,那么完全数据的对数似然函数是$\log P(Y,Z|\theta)$。

EM算法通过迭代求$L(\theta)=\log P(Y|\theta)$的极大似然估计。每次迭代包含两步:E步,求期望;M步,求极大化。以下为EM算法的描述

输入:观测变量数据$Y$,隐变量数据$Z$,联合分布$P(Y,Z|\theta)$,条件分布$P(Z|Y,\theta)$;

输出:模型参数$\theta$。

(1) 选择参数的初值$\theta^{(0)}$,开始迭代;

$$ \begin{aligned} Q(\theta,\theta^{(i)})&=E_Z[\log P(Y,Z|\theta)|Y,\theta^{(i)}]\\ &=\sum_Z \log P(Y,Z|\theta)P(Z|Y,\theta^{(i)}) \end{aligned} $$

这里,$P(Z|Y,\theta^{(i)})$是在给定观测数据$Y$和当前的参数估计$\theta^{(i)}$下隐变量数据$Z$的条件概率分布;

$$ \theta^{(i+1)}=\arg\max_\theta Q(\theta,\theta^{(i)}) $$

(4) 重复(2)和(3),直到收敛。

$$ Q(\theta,\theta^{(i)})=E_Z[\log P(Y,Z|\theta)|Y,\theta^{(i)}] $$

下面关于EM算法作几点说明:

步骤(1) 参数的初值可以任意选择,但需注意EM算法对初值是敏感的。

步骤(2) E步求$Q$函数时,$Q$函数式中$Z$是未观测数据,$Y$是观测数据。注意,$Q(\theta,\theta^{(i)})$的第一个变元表示要极大化的参数,第二个变元表示参数的当前估计值。每次迭代实际在求$Q$函数及其极大。

步骤(3) M步求$Q$函数的极大化,得到$\theta^{(i+1)}$,完成一次迭代。每次迭代使似然函数增大或达到局部极值

$$ \|\theta^{(i+1)}-\theta^{(i)}\|<\varepsilon_1 $$$$ \|Q(\theta^{(i+1)},\theta^{(i)})-Q(\theta^{(i)},\theta^{(i)})\|<\varepsilon_2 $$

则停止迭代。

EM算法的Python实现

import numpy as np
import random
import math
import time


def loadData(mu0, sigma0, mu1, sigma1, alpha0, alpha1):
    """
    初始化数据集
    这里通过服从高斯分布的随机函数来伪造数据集
    :param mu0: 高斯0的均值
    :param sigma0: 高斯0的方差
    :param mu1: 高斯1的均值
    :param sigma1: 高斯1的方差
    :param alpha0: 高斯0的系数
    :param alpha1: 高斯1的系数
    :return: 混合了两个高斯分布的数据
    """
    length = 1000  # 定义数据集长度为1000

    # 初始化第一个高斯分布,生成数据,数据长度为length * alpha系数,以此来满足alpha的作用
    data0 = np.random.normal(mu0, sigma0, int(length * alpha0))
    # 第二个高斯分布的数据
    data1 = np.random.normal(mu1, sigma1, int(length * alpha1))

    # 初始化总数据集,两个高斯分布的数据混合后会放在该数据集中返回
    dataSet = []
    dataSet.extend(data0)  # 将第一个数据集的内容添加进去
    dataSet.extend(data1)  # 添加第二个数据集的数据
    random.shuffle(dataSet)  # 对总的数据集进行打乱
    return dataSet  # 返回数据集


def calcGauss(dataSetArr, mu, sigmod):
    """
    根据高斯密度函数计算值
    注:在公式中y是一个实数,但是在EM算法中,需要对每个j都求一次yjk,在本实例
    中有1000个可观测数据,因此需要计算1000次。考虑到在E步时进行1000次高斯计
    算,程序上比较不简洁,因此这里的y是向量,在numpy的exp中如果exp内部值为向
    量,则对向量中每个值进行exp,输出仍是向量的形式。所以使用向量的形式1次计算
    即可将所有计算结果得出,程序上较为简洁。
    :param dataSetArr: 可观测数据集
    :param mu: 均值
    :param sigmod: 方差
    :return: 整个可观测数据集的高斯分布密度(向量形式)
    """
    result = (1 / (math.sqrt(2 * math.pi) * sigmod ** 2)) * np.exp(
        -1 * (dataSetArr - mu) * (dataSetArr - mu) / (2 * sigmod ** 2))
    return result


def E_step(dataSetArr, alpha0, mu0, sigmod0, alpha1, mu1, sigmod1):
    """
    EM算法中的E步
    依据当前模型参数,计算分模型k对观数据y的响应度
    :param dataSetArr: 可观测数据y
    :param alpha0: 高斯模型0的系数
    :param mu0: 高斯模型0的均值
    :param sigmod0: 高斯模型0的方差
    :param alpha1: 高斯模型1的系数
    :param mu1: 高斯模型1的均值
    :param sigmod1: 高斯模型1的方差
    :return: 两个模型各自的响应度
    """
    # 计算y0的响应度
    gamma0 = alpha0 * calcGauss(dataSetArr, mu0, sigmod0)  # 模型0的响应度的分子
    gamma1 = alpha1 * calcGauss(dataSetArr, mu1, sigmod1)  # 模型1响应度的分子
    sum = gamma0 + gamma1  # 两者相加为E步中的分布
    gamma0 = gamma0 / sum  # 各自相除,得到两个模型的响应度
    gamma1 = gamma1 / sum

    return gamma0, gamma1  # 返回两个模型响应度


def M_step(muo, mu1, gamma0, gamma1, dataSetArr):
    mu0_new = np.dot(gamma0, dataSetArr) / np.sum(gamma0)
    mu1_new = np.dot(gamma1, dataSetArr) / np.sum(gamma1)

    sigmod0_new = math.sqrt(np.dot(gamma0, (dataSetArr - muo) ** 2) / np.sum(gamma0))
    sigmod1_new = math.sqrt(np.dot(gamma1, (dataSetArr - mu1) ** 2) / np.sum(gamma1))

    alpha0_new = np.sum(gamma0) / len(gamma0)
    alpha1_new = np.sum(gamma1) / len(gamma1)

    # 将更新的值返回
    return mu0_new, mu1_new, sigmod0_new, sigmod1_new, alpha0_new, alpha1_new


def EM_Train(dataSetList, iter=500):
    """
    根据EM算法进行参数估计
    :param dataSetList:数据集(可观测数据)
    :param iter: 迭代次数
    :return: 估计的参数
    """
    # 将可观测数据y转换为数组形式,主要是为了方便后续运算
    dataSetArr = np.array(dataSetList)

    # 对参数取初值,开始迭代
    alpha0 = 0.5
    mu0 = 0
    sigmod0 = 1
    alpha1 = 0.5
    mu1 = 1
    sigmod1 = 1

    step = 0  # 开始迭代
    while step < iter:
        step += 1  # 每次进入一次迭代后迭代次数加1
        # E步:依据当前模型参数,计算分模型k对观测数据y的响应度
        gamma0, gamma1 = E_step(dataSetArr, alpha0, mu0, sigmod0, alpha1, mu1, sigmod1)
        # M步
        mu0, mu1, sigmod0, sigmod1, alpha0, alpha1 = M_step(mu0, mu1, gamma0, gamma1, dataSetArr)
    return alpha0, mu0, sigmod0, alpha1, mu1, sigmod1  # 迭代结束后将更新后的各参数返回


if __name__ == '__main__':
    start = time.time()

    alpha0 = 0.3  # 系数α
    mu0 = -2  # 均值μ
    sigmod0 = 0.5  # 方差σ

    alpha1 = 0.7  # 系数α
    mu1 = 0.5  # 均值μ
    sigmod1 = 1  # 方差σ

    # 初始化数据集
    dataSetList = loadData(mu0, sigmod0, mu1, sigmod1, alpha0, alpha1)

    # 打印设置的参数
    print('---------------------------')
    print('the Parameters set is:')
    print('alpha0:%.1f, mu0:%.1f, sigmod0:%.1f, alpha1:%.1f, mu1:%.1f, sigmod1:%.1f' % (
        alpha0, alpha1, mu0, mu1, sigmod0, sigmod1
    ))

    # 开始EM算法,进行参数估计
    alpha0, mu0, sigmod0, alpha1, mu1, sigmod1 = EM_Train(dataSetList)

    # 打印参数预测结果
    print('----------------------------')
    print('the Parameters predict is:')
    print('alpha0:%.1f, mu0:%.1f, sigmod0:%.1f, alpha1:%.1f, mu1:%.1f, sigmod1:%.1f' % (
        alpha0, alpha1, mu0, mu1, sigmod0, sigmod1
    ))

    print('----------------------------')
    print('time span:', time.time() - start)

参考资料

  • 李航. 统计学习方法. 北京: 清华大学出版社, 2019.

  • EM算法的实现:https://www.cnblogs.com/chenxiangzhen/p/10435969.html