使用多处理模块运行并行进程,其中一个进程由另一个进程提供(依赖)维特比算法

Using multiprocessing module to runs parallel processes where one is fed (dependent) by the other for Viterbi Algorithm

我最近使用了 Python 的多处理模块来加速隐马尔可夫模型的前向-后向算法,因为前向过滤和反向过滤可以 运行 独立。看到 运行 时间减半是令人敬畏的事情。

我现在尝试在我的迭代 Viterbi algorithm.In 这个算法中包含一些多处理,我尝试 运行 的两个进程不是独立的。 val_max 部分可以 运行 独立但 arg_max[t] 取决于 val_max[t-1]。所以我玩了一个想法,可以 运行 val_max 作为一个单独的过程,然后 arg_max 也可以作为一个单独的过程,可以由 val_max.[=12= 提供]

我承认我在这里有点不知所云,除了观看一些基本视频和浏览博客外,我对多处理了解不多。我在下面提供了我的尝试,但它不起作用。


import numpy as np
from time import time,sleep
import multiprocessing as mp

class Viterbi:


    def __init__(self,A,B,pi):
        self.M = A.shape[0] # number of hidden states
        self.A = A  # Transition Matrix
        self.B = B   # Observation Matrix
        self.pi = pi   # Initial distribution
        self.T = None   # time horizon
        self.val_max = None
        self.arg_max = None
        self.obs = None
        self.sleep_time = 1e-6
        self.output = mp.Queue()


    def get_path(self,x):
        # returns the most likely state sequence given observed sequence x
        # using the Viterbi algorithm
        self.T = len(x)
        self.val_max = np.zeros((self.T, self.M))
        self.arg_max = np.zeros((self.T, self.M))
        self.val_max[0] = self.pi*self.B[:,x[0]]
        for t in range(1, self.T):
            # Indepedent Process
            self.val_max[t] = np.max( self.A*np.outer(self.val_max[t-1],self.B[:,obs[t]]) , axis = 0  ) 
            # Dependent Process
            self.arg_max[t] = np.argmax( self.val_max[t-1]*self.A.T, axis = 1)

        # BACKTRACK
        states = np.zeros(self.T, dtype=np.int32)
        states[self.T-1] = np.argmax(self.val_max[self.T-1])
        for t in range(self.T-2, -1, -1):
            states[t] = self.arg_max[t+1, states[t+1]]
        return states

    def get_val(self):
        '''Independent Process'''
        for t in range(1,self.T):
            self.val_max[t] = np.max( self.A*np.outer(self.val_max[t-1],self.B[:,self.obs[t]]) , axis = 0  ) 
        self.output.put(self.val_max)

    def get_arg(self):
        '''Dependent Process'''
        for t in range(1,self.T):
            while 1:
                # Process info if available
                if self.val_max[t-1].any() != 0:
                    self.arg_max[t] = np.argmax( self.val_max[t-1]*self.A.T, axis = 1)
                    break
                # Else sleep and wait for info to arrive
                sleep(self.sleep_time)
        self.output.put(self.arg_max)

    def get_path_parallel(self,x):
        self.obs = x
        self.T = len(obs)
        self.val_max = np.zeros((self.T, self.M))
        self.arg_max = np.zeros((self.T, self.M))
        val_process = mp.Process(target=self.get_val)
        arg_process = mp.Process(target=self.get_arg)  
        # get first initial value for val_max which can feed arg_process
        self.val_max[0] = self.pi*self.B[:,obs[0]]
        arg_process.start()
        val_process.start()
        arg_process.join()
        val_process.join()

注意:get_path_parallel还没有回溯。

val_process 和 arg_process 似乎从未真正 运行。真的不确定为什么会这样。您可以 运行 维特比算法的维基百科示例代码。

obs = np.array([0,1,2])  # normal then cold and finally dizzy  

pi = np.array([0.6,0.4])

A = np.array([[0.7,0.3],
             [0.4,0.6]])

B = np.array([[0.5,0.4,0.1],
             [0.1,0.3,0.6]]) 

viterbi = Viterbi(A,B,pi)
path = viterbi.get_path(obs)

我也试过使用 Ray。但是,我不知道我在那里到底在做什么。你能帮我推荐一下如何让并行版本达到 运行 吗?我一定是做错了什么,但我不知道是什么。

非常感谢您的帮助。

欢迎来到 SO。考虑查看在多处理中大量使用的 producer-consumer 模式。

请注意,Python 中的多处理会为您在 Windows 上创建的每个进程重新实例化您的代码。所以你的 Viterbi 对象和它们的 Queue 字段是不一样的。

通过以下方式观察此行为:

import os

def get_arg(self):
    '''Dependent Process'''
    print("Dependent ", self)
    print("Dependent ", self.output)
    print("Dependent ", os.getpid())

def get_val(self):
    '''Independent Process'''
    print("Independent ", self)
    print("Independent ", self.output)
    print("Independent ", os.getpid())

if __name__ == "__main__":
    print("Hello from main process", os.getpid())
    obs = np.array([0,1,2])  # normal then cold and finally dizzy  

    pi = np.array([0.6,0.4])

    A = np.array([[0.7,0.3],
             [0.4,0.6]])

    B = np.array([[0.5,0.4,0.1],
             [0.1,0.3,0.6]]) 

    viterbi = Viterbi(A,B,pi)
    print("Main viterbi object", viterbi)
    print("Main viterbi object queue", viterbi.output)
    path = viterbi.get_path_parallel(obs)

由于存在三个不同的进程,因此存在三个不同的维特比对象。因此,就并行性而言,您需要的不是进程。您应该探索 Python 提供的 threading 库。

感谢@SıddıkAçıl,我已经设法让我的代码正常工作。 producer-consumer 模式就是诀窍。我还意识到这些过程可以 运行 成功,但如果没有将最终结果存储在 "result queue" 中,那么它就会消失。我的意思是,我通过允许进程 start() 在我的 numpy 数组 val_max 和 arg_max 中填充了值,但是当我调用它们时,它们仍然是 np.zero 数组。我通过在进程即将终止时打印它们来验证它们确实填充了正确的数组(最后 self.T 在迭代中)。因此,我没有打印它们,而是在最后一次迭代中将它们添加到一个多处理队列对象中,以捕获整个填满的数组。

我在下面提供更新后的工作代码。注意:它可以正常工作,但完成时间是串行版本的两倍。我对为什么会这样的想法如下:

  1. 我可以将它作为两个进程发送到 运行,但实际上并不知道如何正确执行。有经验的程序员可能知道如何使用 chunksize 参数修复它。
  2. 我分离的两个进程是numpy矩阵运算。这些进程执行得如此之快,以至于并发(多处理)的开销不值得理论上的改进。如果这两个进程是两个原始的 for 循环(在维基百科和大多数实现中使用),那么多处理可能会带来收益(也许我应该对此进行调查)。此外,因为我们有一个 producer-consumer 模式而不是两个独立的进程(producer-producer 模式),我们只能期望 producer-consumer 模式 运行 只要两个中最长的一个进程(在这种情况下,生产者花费的时间是消费者的两倍)。我们不能期望 运行 时间会像 producer-producer 场景那样减半(这发生在我的并行 forward-backward HMM 过滤算法中)。
  3. 我的电脑有 4 个内核,numpy 已经对其操作进行了 built-in CPU 多处理优化。通过我尝试使用核心来使代码更快,我剥夺了 numpy 可以更有效地使用的核心。为了解决这个问题,我将对 numpy 操作进行计时,看看它们在我的并发版本中是否比我的串行版本慢。

如果有新的收获我会更新的。如果您可能知道我的并发代码慢得多的真正原因,请告诉我。这是代码:


import numpy as np
from time import time
import multiprocessing as mp

class Viterbi:


    def __init__(self,A,B,pi):
        self.M = A.shape[0] # number of hidden states
        self.A = A  # Transition Matrix
        self.B = B   # Observation Matrix
        self.pi = pi   # Initial distribution
        self.T = None   # time horizon
        self.val_max = None
        self.arg_max = None
        self.obs = None
        self.intermediate = mp.Queue()
        self.result = mp.Queue()



    def get_path(self,x):
        '''Sequential/Serial Viterbi Algorithm with backtracking'''
        self.T = len(x)
        self.val_max = np.zeros((self.T, self.M))
        self.arg_max = np.zeros((self.T, self.M))
        self.val_max[0] = self.pi*self.B[:,x[0]]
        for t in range(1, self.T):
            # Indepedent Process
            self.val_max[t] = np.max( self.A*np.outer(self.val_max[t-1],self.B[:,obs[t]]) , axis = 0  ) 
            # Dependent Process
            self.arg_max[t] = np.argmax( self.val_max[t-1]*self.A.T, axis = 1)

        # BACKTRACK
        states = np.zeros(self.T, dtype=np.int32)
        states[self.T-1] = np.argmax(self.val_max[self.T-1])
        for t in range(self.T-2, -1, -1):
            states[t] = self.arg_max[t+1, states[t+1]]
        return states

    def get_val(self,intial_val_max):
        '''Independent Poducer Process'''
        val_max = intial_val_max
        for t in range(1,self.T):
            val_max = np.max( self.A*np.outer(val_max,self.B[:,self.obs[t]]) , axis = 0  )
            #print('Transfer: ',self.val_max[t])
            self.intermediate.put(val_max)
            if t == self.T-1:
                self.result.put(val_max)   # we only need the last val_max value for backtracking




    def get_arg(self):
        '''Dependent Consumer Process.'''
        t = 1
        while t < self.T:
            val_max =self.intermediate.get()
            #print('Receive: ',val_max)
            self.arg_max[t] = np.argmax( val_max*self.A.T, axis = 1)
            if t == self.T-1:
                self.result.put(self.arg_max)
            #print('Processed: ',self.arg_max[t])
            t += 1

    def get_path_parallel(self,x):
        '''Multiprocessing producer-consumer implementation of Viterbi algorithm.'''
        self.obs = x
        self.T = len(obs)
        self.arg_max = np.zeros((self.T, self.M))  # we don't tabulate val_max anymore
        initial_val_max = self.pi*self.B[:,obs[0]]
        producer_process = mp.Process(target=self.get_val,args=(initial_val_max,),daemon=True)
        consumer_process = mp.Process(target=self.get_arg,daemon=True) 
        self.intermediate.put(initial_val_max)  # initial production put into pipeline for consumption
        consumer_process.start()  # we can already consume initial_val_max
        producer_process.start()
        #val_process.join()
        #arg_process.join()
        #self.output.join()
        return self.backtrack(self.result.get(),self.result.get()) # backtrack takes last row of val_max and entire arg_max

    def backtrack(self,val_max_last_row,arg_max):
        '''Backtracking the Dynamic Programming solution (actually a Trellis diagram)
           produced by Multiprocessing Viterbi algorithm.'''
        states = np.zeros(self.T, dtype=np.int32)
        states[self.T-1] = np.argmax(val_max_last_row)
        for t in range(self.T-2, -1, -1):
            states[t] = arg_max[t+1, states[t+1]]
        return states



if __name__ == '__main__':

    obs = np.array([0,1,2])  # normal then cold and finally dizzy  

    T = 100000
    obs = np.random.binomial(2,0.3,T)        

    pi = np.array([0.6,0.4])

    A = np.array([[0.7,0.3],
                 [0.4,0.6]])

    B = np.array([[0.5,0.4,0.1],
                 [0.1,0.3,0.6]]) 

    t1 = time()
    viterbi = Viterbi(A,B,pi)
    path = viterbi.get_path(obs)
    t2 = time()
    print('Iterative Viterbi')
    print('Path: ',path)
    print('Run-time: ',round(t2-t1,6)) 
    t1 = time()
    viterbi = Viterbi(A,B,pi)
    path = viterbi.get_path_parallel(obs)
    t2 = time()
    print('\nParallel Viterbi')
    print('Path: ',path)
    print('Run-time: ',round(t2-t1,6))