class 的多处理成员函数存储在 python 中的数组中

Multi Processing member function of a class store in array in python

我主要使用 C、C++ 编程,最近将一个项目转换为 python。除了我无法轻松转换多处理。

在示例中,我有一个用球 class 填充的数组,它有一个名为 update 的成员函数,该函数传入了 3 个变量。

下面就是了。它存储在一个名为 balls 的数组中。 我浏览了足够多的 post 文档和视频,但没有找到任何内容来涵盖这方面的一些内容,但没有说明如何处理传入的变量。

理想情况下,我会创建一个进程拉取,让它在它们之间拆分工作。 我需要检索对象并更新原始过程中的对象 space。

不确定,但看起来强制它输出元组然后用所有数据更新 class 并编写另一个函数来更新 class 可能更容易。

欢迎在 python 中提供有关执行此操作的最佳方法的反馈。此外,我更欣赏性能而不是简单地做某事。毕竟这就是这样做的意义所在。 提前致谢。

class Ball:
          
    def __init__(self,x,y,vx,vy,c):
        self.x=x
        self.y=y
        self.vx=vx
        self.vy=vy
        self.color=c
        return
    @classmethod
    def update(self,w,h,t):
        time = float(t)/float(1000000)
        #print(time)
        xp = float(self.vx)*float(time)
        yp= float(self.vy)*float(time)
        self.x += xp
        self.y += yp
        #print (str(xp) +"," +str(yp))
        if self.x<32:
            self.vx = 0 - self.vx
            self.x += (32-self.x)
        if self.y<32:
            self.vy = 0 - self.vy
            self.y += (32-self.y)
        if self.x+32>w:
            self.vx = 0 - self.vx
            self.x -= (self.x+32)-w
        if self.y+32>h:
            self.vy = 0 - self.vy
            self.y -= (self.y+32)-h
        return

class通过以下方法更新

def play_u(self):
    t = self.gt.elapsed_time()
    self.gt.set_timer()
    for i in self.balls:
        i.update(self.width,self.height,t)
    return

这里有一个关于如何针对具有相同参数的多个 Ball 对象并行调用 update 的想法。这里我使用 multiprocessing.pool.Pool class.

因为Python serializes/de-serializes 从主进程到池中将要执行任务的进程的Ball 对象,对对象的任何修改都不会反映回来在主进程中“存在”的对象副本中(如您所见)。但这并不能阻止 update 返回已修改的更新属性列表(或 元组 ),主进程可用于更新其对象副本。

class Ball:
    # If this is a class constant, then it can and should stay here:
    radius = 32

    def __init__(self, x, y, vx, vy, c):
        self.x = x
        self.y = y
        self.vx = vx
        self.vy = vy
        self.color = c
        return

    def update(self, w, h, t):
        time = float(t) / 1000000.0
        #print(time)
        xp = float(self.vx) * float(time)
        yp = float(self.vy) * float(time)
        self.x += xp
        self.y += yp
        #print (str(xp) +"," +str(yp))
        if self.x < 32:
            self.vx = 0 - self.vx
            self.x += (32 - self.x)
        if self.y < 32:
            self.vy = 0 - self.vy
            self.y += (32 - self.y)
        if self.x + 32 > w:
            self.vx = 0 - self.vx
            self.x -= (self.x + 32) - w
        if self.y + 32 > h:
            self.vy = 0 - self.vy
            self.y -= (self.y + 32) - h
        # Return tuple of attributes that have changed
        # (Not used by serial benchmark)
        return (self.x, self.y, self.vx, self.vy)

    def __repr__(self):
        """
        Return internal dictionary of attributes as a string
        """
        return str(self.__dict__)

def prepare_benchmark():
    balls = [Ball(1, 2, 3, 4, 5) for _ in range(1000)]
    arg_list = (3.0, 4.0, 1.0)
    return balls, arg_list

def serial(balls, arg_list):
    for ball in balls:
        ball.update(*arg_list)

def parallel_updater(arg_list, ball):
    return ball.update(*arg_list)

def parallel(pool, balls, arg_list):
    from functools import partial

    worker = partial(parallel_updater, arg_list)
    results = pool.map(worker, balls)
    for idx, result in enumerate(results):
        ball = balls[idx]
        # unpack:
        ball.x, ball.y, ball.vx, ball.vy = result

def parallel2(pool, balls, arg_list):
    results = [pool.apply_async(ball.update, args=arg_list) for ball in balls]
    for idx, result in enumerate(results):
        ball = balls[idx]
        # unpack:
        ball.x, ball.y, ball.vx, ball.vy = result.get()

def main():
    import time

    # Serial performance:
    balls, arg_list = prepare_benchmark()
    t = time.perf_counter()
    serial(balls, arg_list)
    elapsed = time.perf_counter() - t
    print(balls[0])
    print('Serial elapsed time:', elapsed)

    print()
    print('-'*80)
    print()

    # Parallel performance using map
    # We won't even include the time it takes to create the pool
    from multiprocessing import Pool
    pool = Pool() # pool size is 8 on my desktop
    balls, arg_list = prepare_benchmark()
    t = time.perf_counter()
    parallel(pool, balls, arg_list)
    elapsed = time.perf_counter() - t
    print(balls[0])
    print('Parallel elapsed time:', elapsed)

    print()
    print('-'*80)
    print()

    # Parallel performance using apply_async
    balls, arg_list = prepare_benchmark()
    t = time.perf_counter()
    parallel2(pool, balls, arg_list)
    elapsed = time.perf_counter() - t
    print(balls[0])
    print('Parallel2 elapsed time:', elapsed)


    pool.close()
    pool.join()


# Required for windows
if __name__ == '__main__':
    main()

打印:

{'x': -29.0, 'y': -28.0, 'vx': 3, 'vy': 4, 'color': 5}
Serial elapsed time: 0.0018328999999999984

--------------------------------------------------------------------------------

{'x': -29.0, 'y': -28.0, 'vx': 3, 'vy': 4, 'color': 5}
Parallel elapsed time: 0.236945

--------------------------------------------------------------------------------

{'x': -29.0, 'y': -28.0, 'vx': 3, 'vy': 4, 'color': 5}
Parallel2 elapsed time: 0.1460790000000000

我对所有事情都使用了无意义的参数,但是你可以看到处理 serialization/deserialization 和更新主进程对象的开销无法通过并行处理 1,000 个调用来补偿,当你有这样一个微不足道的工作者函数为 update.

请注意,在这种情况下,使用方法 apply_async 的基准测试 Parallel2 实际上比使用方法 map 的基准测试 Parallel 的性能更高,这有点令人惊讶。我的猜测是,这部分是由于必须使用方法 functools.partial 以以下形式传达额外的、不变的 wht 参数arg_list 到工作函数 parallel_updater,它提供了所需的附加函数调用。因此,基准测试 Parallel 必须为每次球更新总共进行两次函数调用。