Python 中的收缩阵列模拟

Systolic Array Simulation in Python

我正在尝试模拟脉动阵列结构——所有这些都是我从这些幻灯片中学到的:http://web.cecs.pdx.edu/~mperkows/temp/May22/0020.Matrix-multiplication-systolic.pdf——用于 Python 环境中的矩阵乘法。脉动阵列的一个组成部分是 PE 之间的数据流与任何一个节点上发生的任何乘法或加法并发。我很难推测如何在 Python 中准确地实现这样的并发过程。具体来说,我希望更好地理解一种计算方法,以级联方式将要相乘的矩阵元素馈送到脉动阵列,同时允许这些元素以并发方式在阵列中传播。

我已经开始在 python 中编写一些代码来将两个 3 x 3 数组相乘,但最终,我想模拟任何大小的脉动数组以处理任何大小的 a 和 b 矩阵。

from threading import Thread
from collections import deque

vals_deque = deque(maxlen=9*2)#will hold the interconnections between nodes of the systolicarray
dump=deque(maxlen=9) # will be the output of the SystolicArray
prev_size = 0

def setupSystolicArray():
    global SystolicArray
    SystolicArray = [NodeSystolic(i,j) for i in range(3), for i in range(3)]

def spreadInputs(a,b):
    #needs some way to initially propagate the elements of a and b through the top and leftmost parts of the systolic array

    new = map(lambda x: x.start() , SystolicArray) #start all the nodes of the systolic array, they are waiting for an input

    #even if i found a way to put these inputs into the array at the start, I am not sure how to coordinate future inputs into the array in the cascading fashion described in the slides
    while(len(dump) != 9):
        if(len(vals_deque) != prev_size):

            vals = vals_deque[-1]
            row = vals['t'][0]
            col = vals['l'][0]
            a= vals['t'][1]
            b = vals['l'][1]
            # these if elif statements track whether the outputs are at the edge of the systolic array and can be removed
            if(row >= 3):
                dump.append(a)
            elif(col >= 3):
                dump.append(b)
            else:
                #something is wrong with the logic here
                SystolicArray[t][l-1].update(a,b)
                SystolicArray[t-1][l].update(a,b)   

class NodeSystolic:
    def __init__(self,row, col):
        self.row = row
        self.col = col
        self.currval = 0
        self.up = False
        self.ain = 0#coming in from the top
        self.bin = 0#coming in from the side

    def start(self):
        Thread(target=self.continuous, args = ()).start()

    def continuous(self):
        while True:
            if(self.up = True):
                self.currval = self.ain*self.bin
                self.up = False
                self.passon(self.ain, self.bin)
            else:
                pass

    def update(self, left, top):
        self.up = True
        self.ain = top
        self.bin = left

    def passon(self, t,l):
        #this will passon the inputs this node has received onto the next nodes
        vals_deque.append([{'t': [self.row+ 1, self.ain], 'l': [self.col + 1, self.bin]}])

    def returnValue(self):
        return self.currval


def main():
    a = np.array([
    [1,2,3],
    [4,5,6],
    [7,8,9],
    ])

    b = np.array([
    [1,2,3],
    [4,5,6],
    [7,8,9]
    ])

    setupSystolicArray()
    spreadInputs(a,b)

以上代码无法运行,还有很多bug。我希望有人可以指导我如何改进代码,或者是否有更简单的方法来模拟具有异步属性的脉动数组的并行过程 Python,因此具有非常大的脉动数组大小,我将不必担心创建太多线程(节点)。

考虑在 Python 中模拟脉动阵列很有趣,但我认为按照您在上面勾勒的路线进行此操作存在一些重大困难。

最重要的是 Python 由 Global Interpreter Lock. This means that you won't get any significant parallelism for compute-limited tasks, and its threads are probably best suited to handling I/O limited tasks such as web-requests or filesystem accesses. The nearest Python can get to this is probably via the multiprocessing 模块引起的真正并行性范围有限的问题,但这需要为每个节点单独处理。

其次,即使您要在脉动数组中的数值运算中获得并行性,您也需要一些 locking 机制来允许不同的线程交换数据(或消息)而不会损坏当他们尝试同时读取和写入数据时,彼此的内存。

关于您示例中的数据结构,我认为您最好让脉动数组中的每个节点都引用其上游节点,而不是知道它位于 NxM 网格中的特定位置。我认为脉动阵列没有任何理由需要是矩形网格,并且任何有向无环图 (DAG) 仍然具有高效分布式计算的潜力。

总的来说,我预计在 Python 中进行此模拟的计算开销相对于 Scala 或 C++ 等低级语言所能实现的开销而言是巨大的。即便如此,除非脉动阵列中的每个节点都在进行大量计算(即远远超过一些乘加),否则节点之间交换消息的开销将是巨大的。因此,我认为您的模拟主要是为了了解数据流和阵列的高级行为,而不是接近定制 DSP(数字信号处理)硬件可以提供的功能。如果是这样的话,那么我很想在没有线程的情况下使用集中式消息队列,所有节点都向该队列提交由全局消息分发机制传递的消息。