class 的多处理成员函数存储在 python 中的数组中
Multi Processing member function of a class store in array in python
我主要使用 C、C++ 编程,最近将一个项目转换为 python。除了我无法轻松转换多处理。
在示例中,我有一个用球 class 填充的数组,它有一个名为 update 的成员函数,该函数传入了 3 个变量。
下面就是了。它存储在一个名为 balls 的数组中。
我浏览了足够多的 post 文档和视频,但没有找到任何内容来涵盖这方面的一些内容,但没有说明如何处理传入的变量。
理想情况下,我会创建一个进程拉取,让它在它们之间拆分工作。
我需要检索对象并更新原始过程中的对象 space。
不确定,但看起来强制它输出元组然后用所有数据更新 class 并编写另一个函数来更新 class 可能更容易。
欢迎在 python 中提供有关执行此操作的最佳方法的反馈。此外,我更欣赏性能而不是简单地做某事。毕竟这就是这样做的意义所在。
提前致谢。
class Ball:
def __init__(self,x,y,vx,vy,c):
self.x=x
self.y=y
self.vx=vx
self.vy=vy
self.color=c
return
@classmethod
def update(self,w,h,t):
time = float(t)/float(1000000)
#print(time)
xp = float(self.vx)*float(time)
yp= float(self.vy)*float(time)
self.x += xp
self.y += yp
#print (str(xp) +"," +str(yp))
if self.x<32:
self.vx = 0 - self.vx
self.x += (32-self.x)
if self.y<32:
self.vy = 0 - self.vy
self.y += (32-self.y)
if self.x+32>w:
self.vx = 0 - self.vx
self.x -= (self.x+32)-w
if self.y+32>h:
self.vy = 0 - self.vy
self.y -= (self.y+32)-h
return
class通过以下方法更新
def play_u(self):
t = self.gt.elapsed_time()
self.gt.set_timer()
for i in self.balls:
i.update(self.width,self.height,t)
return
这里有一个关于如何针对具有相同参数的多个 Ball
对象并行调用 update
的想法。这里我使用 multiprocessing.pool.Pool
class.
因为Python serializes/de-serializes 从主进程到池中将要执行任务的进程的Ball
对象,对对象的任何修改都不会反映回来在主进程中“存在”的对象副本中(如您所见)。但这并不能阻止 update
返回已修改的更新属性列表(或 元组 ),主进程可用于更新其对象副本。
class Ball:
# If this is a class constant, then it can and should stay here:
radius = 32
def __init__(self, x, y, vx, vy, c):
self.x = x
self.y = y
self.vx = vx
self.vy = vy
self.color = c
return
def update(self, w, h, t):
time = float(t) / 1000000.0
#print(time)
xp = float(self.vx) * float(time)
yp = float(self.vy) * float(time)
self.x += xp
self.y += yp
#print (str(xp) +"," +str(yp))
if self.x < 32:
self.vx = 0 - self.vx
self.x += (32 - self.x)
if self.y < 32:
self.vy = 0 - self.vy
self.y += (32 - self.y)
if self.x + 32 > w:
self.vx = 0 - self.vx
self.x -= (self.x + 32) - w
if self.y + 32 > h:
self.vy = 0 - self.vy
self.y -= (self.y + 32) - h
# Return tuple of attributes that have changed
# (Not used by serial benchmark)
return (self.x, self.y, self.vx, self.vy)
def __repr__(self):
"""
Return internal dictionary of attributes as a string
"""
return str(self.__dict__)
def prepare_benchmark():
balls = [Ball(1, 2, 3, 4, 5) for _ in range(1000)]
arg_list = (3.0, 4.0, 1.0)
return balls, arg_list
def serial(balls, arg_list):
for ball in balls:
ball.update(*arg_list)
def parallel_updater(arg_list, ball):
return ball.update(*arg_list)
def parallel(pool, balls, arg_list):
from functools import partial
worker = partial(parallel_updater, arg_list)
results = pool.map(worker, balls)
for idx, result in enumerate(results):
ball = balls[idx]
# unpack:
ball.x, ball.y, ball.vx, ball.vy = result
def parallel2(pool, balls, arg_list):
results = [pool.apply_async(ball.update, args=arg_list) for ball in balls]
for idx, result in enumerate(results):
ball = balls[idx]
# unpack:
ball.x, ball.y, ball.vx, ball.vy = result.get()
def main():
import time
# Serial performance:
balls, arg_list = prepare_benchmark()
t = time.perf_counter()
serial(balls, arg_list)
elapsed = time.perf_counter() - t
print(balls[0])
print('Serial elapsed time:', elapsed)
print()
print('-'*80)
print()
# Parallel performance using map
# We won't even include the time it takes to create the pool
from multiprocessing import Pool
pool = Pool() # pool size is 8 on my desktop
balls, arg_list = prepare_benchmark()
t = time.perf_counter()
parallel(pool, balls, arg_list)
elapsed = time.perf_counter() - t
print(balls[0])
print('Parallel elapsed time:', elapsed)
print()
print('-'*80)
print()
# Parallel performance using apply_async
balls, arg_list = prepare_benchmark()
t = time.perf_counter()
parallel2(pool, balls, arg_list)
elapsed = time.perf_counter() - t
print(balls[0])
print('Parallel2 elapsed time:', elapsed)
pool.close()
pool.join()
# Required for windows
if __name__ == '__main__':
main()
打印:
{'x': -29.0, 'y': -28.0, 'vx': 3, 'vy': 4, 'color': 5}
Serial elapsed time: 0.0018328999999999984
--------------------------------------------------------------------------------
{'x': -29.0, 'y': -28.0, 'vx': 3, 'vy': 4, 'color': 5}
Parallel elapsed time: 0.236945
--------------------------------------------------------------------------------
{'x': -29.0, 'y': -28.0, 'vx': 3, 'vy': 4, 'color': 5}
Parallel2 elapsed time: 0.1460790000000000
我对所有事情都使用了无意义的参数,但是你可以看到处理 serialization/deserialization 和更新主进程对象的开销无法通过并行处理 1,000 个调用来补偿,当你有这样一个微不足道的工作者函数为 update
.
请注意,在这种情况下,使用方法 apply_async
的基准测试 Parallel2 实际上比使用方法 map
的基准测试 Parallel 的性能更高,这有点令人惊讶。我的猜测是,这部分是由于必须使用方法 functools.partial
以以下形式传达额外的、不变的 w
、h
和 t
参数arg_list
到工作函数 parallel_updater
,它提供了所需的附加函数调用。因此,基准测试 Parallel 必须为每次球更新总共进行两次函数调用。
我主要使用 C、C++ 编程,最近将一个项目转换为 python。除了我无法轻松转换多处理。
在示例中,我有一个用球 class 填充的数组,它有一个名为 update 的成员函数,该函数传入了 3 个变量。
下面就是了。它存储在一个名为 balls 的数组中。 我浏览了足够多的 post 文档和视频,但没有找到任何内容来涵盖这方面的一些内容,但没有说明如何处理传入的变量。
理想情况下,我会创建一个进程拉取,让它在它们之间拆分工作。 我需要检索对象并更新原始过程中的对象 space。
不确定,但看起来强制它输出元组然后用所有数据更新 class 并编写另一个函数来更新 class 可能更容易。
欢迎在 python 中提供有关执行此操作的最佳方法的反馈。此外,我更欣赏性能而不是简单地做某事。毕竟这就是这样做的意义所在。 提前致谢。
class Ball:
def __init__(self,x,y,vx,vy,c):
self.x=x
self.y=y
self.vx=vx
self.vy=vy
self.color=c
return
@classmethod
def update(self,w,h,t):
time = float(t)/float(1000000)
#print(time)
xp = float(self.vx)*float(time)
yp= float(self.vy)*float(time)
self.x += xp
self.y += yp
#print (str(xp) +"," +str(yp))
if self.x<32:
self.vx = 0 - self.vx
self.x += (32-self.x)
if self.y<32:
self.vy = 0 - self.vy
self.y += (32-self.y)
if self.x+32>w:
self.vx = 0 - self.vx
self.x -= (self.x+32)-w
if self.y+32>h:
self.vy = 0 - self.vy
self.y -= (self.y+32)-h
return
class通过以下方法更新
def play_u(self):
t = self.gt.elapsed_time()
self.gt.set_timer()
for i in self.balls:
i.update(self.width,self.height,t)
return
这里有一个关于如何针对具有相同参数的多个 Ball
对象并行调用 update
的想法。这里我使用 multiprocessing.pool.Pool
class.
因为Python serializes/de-serializes 从主进程到池中将要执行任务的进程的Ball
对象,对对象的任何修改都不会反映回来在主进程中“存在”的对象副本中(如您所见)。但这并不能阻止 update
返回已修改的更新属性列表(或 元组 ),主进程可用于更新其对象副本。
class Ball:
# If this is a class constant, then it can and should stay here:
radius = 32
def __init__(self, x, y, vx, vy, c):
self.x = x
self.y = y
self.vx = vx
self.vy = vy
self.color = c
return
def update(self, w, h, t):
time = float(t) / 1000000.0
#print(time)
xp = float(self.vx) * float(time)
yp = float(self.vy) * float(time)
self.x += xp
self.y += yp
#print (str(xp) +"," +str(yp))
if self.x < 32:
self.vx = 0 - self.vx
self.x += (32 - self.x)
if self.y < 32:
self.vy = 0 - self.vy
self.y += (32 - self.y)
if self.x + 32 > w:
self.vx = 0 - self.vx
self.x -= (self.x + 32) - w
if self.y + 32 > h:
self.vy = 0 - self.vy
self.y -= (self.y + 32) - h
# Return tuple of attributes that have changed
# (Not used by serial benchmark)
return (self.x, self.y, self.vx, self.vy)
def __repr__(self):
"""
Return internal dictionary of attributes as a string
"""
return str(self.__dict__)
def prepare_benchmark():
balls = [Ball(1, 2, 3, 4, 5) for _ in range(1000)]
arg_list = (3.0, 4.0, 1.0)
return balls, arg_list
def serial(balls, arg_list):
for ball in balls:
ball.update(*arg_list)
def parallel_updater(arg_list, ball):
return ball.update(*arg_list)
def parallel(pool, balls, arg_list):
from functools import partial
worker = partial(parallel_updater, arg_list)
results = pool.map(worker, balls)
for idx, result in enumerate(results):
ball = balls[idx]
# unpack:
ball.x, ball.y, ball.vx, ball.vy = result
def parallel2(pool, balls, arg_list):
results = [pool.apply_async(ball.update, args=arg_list) for ball in balls]
for idx, result in enumerate(results):
ball = balls[idx]
# unpack:
ball.x, ball.y, ball.vx, ball.vy = result.get()
def main():
import time
# Serial performance:
balls, arg_list = prepare_benchmark()
t = time.perf_counter()
serial(balls, arg_list)
elapsed = time.perf_counter() - t
print(balls[0])
print('Serial elapsed time:', elapsed)
print()
print('-'*80)
print()
# Parallel performance using map
# We won't even include the time it takes to create the pool
from multiprocessing import Pool
pool = Pool() # pool size is 8 on my desktop
balls, arg_list = prepare_benchmark()
t = time.perf_counter()
parallel(pool, balls, arg_list)
elapsed = time.perf_counter() - t
print(balls[0])
print('Parallel elapsed time:', elapsed)
print()
print('-'*80)
print()
# Parallel performance using apply_async
balls, arg_list = prepare_benchmark()
t = time.perf_counter()
parallel2(pool, balls, arg_list)
elapsed = time.perf_counter() - t
print(balls[0])
print('Parallel2 elapsed time:', elapsed)
pool.close()
pool.join()
# Required for windows
if __name__ == '__main__':
main()
打印:
{'x': -29.0, 'y': -28.0, 'vx': 3, 'vy': 4, 'color': 5}
Serial elapsed time: 0.0018328999999999984
--------------------------------------------------------------------------------
{'x': -29.0, 'y': -28.0, 'vx': 3, 'vy': 4, 'color': 5}
Parallel elapsed time: 0.236945
--------------------------------------------------------------------------------
{'x': -29.0, 'y': -28.0, 'vx': 3, 'vy': 4, 'color': 5}
Parallel2 elapsed time: 0.1460790000000000
我对所有事情都使用了无意义的参数,但是你可以看到处理 serialization/deserialization 和更新主进程对象的开销无法通过并行处理 1,000 个调用来补偿,当你有这样一个微不足道的工作者函数为 update
.
请注意,在这种情况下,使用方法 apply_async
的基准测试 Parallel2 实际上比使用方法 map
的基准测试 Parallel 的性能更高,这有点令人惊讶。我的猜测是,这部分是由于必须使用方法 functools.partial
以以下形式传达额外的、不变的 w
、h
和 t
参数arg_list
到工作函数 parallel_updater
,它提供了所需的附加函数调用。因此,基准测试 Parallel 必须为每次球更新总共进行两次函数调用。