Python 中的异步多处理与池 apply_async
Asynchronous multiprocessing in Python with pool apply_async
我想在共享内存机器上使用异步并行机制并行处理时间图(本质上是 networkx
图的列表)。为了实现它,我使用 multiprocessing
模块中的 Pool.apply_async()
。时间图由 5 个单元(快照)图组成。对于每个单元图,我执行多个计算量大的矩阵运算。
首先考虑一个简单的顺序示例:
#------------------------------------
# Constants
#------------------------------------
NV = 100 # No. of vertices
NE = 25 # No. of edges
NG = 5 # No. of unit graphs
#------------------------------------
# Generate random time-varying graph
#------------------------------------
Gt = gen_time_graph(NV, NE, NG)
# Snapshot index
k = 0
# for each unit graph
for Gk in Gt:
# Temporal adjacency matrix
Atk = adj_mtrx(Gk)
# Temporal weight matrix
# ...
# Temporal eigenvector centrality
# ...
k += 1
它运行完美。接下来,我尝试将每个矩阵操作分配给池中的一个工作人员:
#------------------------------------
# Constants
#------------------------------------
NV = 100 # No. of vertices
NE = 25 # No. of edges
NG = 5 # No. of unit graphs
NP = 2 # No. of processes
#------------------------------------
# Generate random time-varying graph
#------------------------------------
Gt = gen_time_graph(NV, NE, NG)
# Snapshot index
k = 0
if __name__ == '__main__':
with Pool(processes=NP) as pool:
# for each unit graph
for Gk in Gt:
# Temporal adjacency matrix
Atk = pool.apply_async( adj_mtrx, (Gk) ).get()
# Temporal weight matrix
# ...
# Temporal eigenvector centrality
# ...
k += 1
但是,这里的代码因以下错误而崩溃:
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/pool.py", line 125, in worker
result = (True, func(*args, **kwds))
TypeError: adj_mtrx() takes 1 positional argument but 100 were given
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "./aggr_vs_time_dat_par_mini.py", line 100, in <module>
Atk = pool.apply_async( adj_mtrx, (Gk) ).get()
File "/usr/lib/python3.8/multiprocessing/pool.py", line 771, in get
raise self._value
TypeError: adj_mtrx() takes 1 positional argument but 100 were given
我需要帮助调试问题。看起来,图 Gk
被 Pool
分解并作为一组顶点传递给函数。另外,如果您能对我的通用并行化方法(的适当性)发表评论,我将不胜感激 multiprocessing
.
Pool.apply_async()
您可以在下面找到最小工作示例的所有必要代码:
import networkx as nx
import random as rnd
import numpy as np
from multiprocessing import Pool
# Generates random graph
def gen_rnd_graph(nv, ne):
# Create random list of sources
Vsrc = [rnd.randint(0,nv-1) for iter in range(ne)]
# Create random list of sinks
Vsnk = [rnd.randint(0,nv-1) for iter in range(ne)]
# Create random list of edge weights
U = [rnd.random() for iter in range(ne)]
# Create list of tuples {Vsrc, Vsnk, U}
T = list(zip(Vsrc,Vsnk,U))
# Create graph
G = nx.Graph()
# Create list of vertices
V = list(range(nv))
# Add nodes to graph
G.add_nodes_from(V)
# Add edges between random vertices with random edge weights
G.add_weighted_edges_from(T)
return G
# Generates time-varying graph
def gen_time_graph(nv, ne, ng):
# Initialise list of graphs
l = []
for i in range(ng):
gi = gen_rnd_graph(nv, ne)
l.append(gi)
return l
# Computes adjacency matrix for snaphot of time-varying graph
def adj_mtrx(Gk):
# no. of vertices
n = Gk.number_of_nodes()
# adjacency matrix
Ak = np.zeros([n,n])
# for each vertex
for i in range(n):
for j in range(n):
if Gk.has_edge(i,j): Ak[i,j] = 1
return Ak
#------------------------------------
# Constants
#------------------------------------
NV = 100 # No. of vertices
NE = 25 # No. of edges
NG = 5 # No. of unit graphs
NP = 2 # No. of processes
#------------------------------------
# Generate random time-varying graph
#------------------------------------
Gt = gen_time_graph(NV, NE, NG)
# Snapshot index
k = 0
if __name__ == '__main__':
with Pool(processes=NP) as pool:
# for each unit graph
for Gk in Gt:
# Temporal adjacency matrix
Atk = pool.apply_async( adj_mtrx, (Gk) ).get()
k += 1
根据 apply_async
的文档,函数的签名是
apply_async(func[, args[, kwds[, callback[, error_callback]]]])
因此,您需要将 Gk
作为元组传递,即 (Gk,)
:
Atk = pool.apply_async( adj_mtrx, (Gk,) ).get()
背景
您的函数检索 *Gk
作为输入,结果是节点列表:
import networks as nx
g = nx.karate_club_graph()
print(*g)
# 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
1 和 0 长度的元组
有关创建 0 和 1 元素元组的更多详细信息:How to create a tuple with only one element or directly the section in the python documentation
基本上,您使用 ()
创建一个长度为 0
的元组,使用 (Gk,)
创建一个长度为 1 的元组,对于任何更大数量的元素,您可以使用 (x_1, ..., x_n)
或 (x_1, ..., x_n,)
.
*
-运算符
*
-运算符可用于使用任意数量的参数。请参阅 python documentation and section before. Similarly, you can use **
for arbitrary amount of keyword arguments. For more details, take a look at What does the star operator mean, in a function call? 以及此问题中列出的重复项。
我想在共享内存机器上使用异步并行机制并行处理时间图(本质上是 networkx
图的列表)。为了实现它,我使用 multiprocessing
模块中的 Pool.apply_async()
。时间图由 5 个单元(快照)图组成。对于每个单元图,我执行多个计算量大的矩阵运算。
首先考虑一个简单的顺序示例:
#------------------------------------
# Constants
#------------------------------------
NV = 100 # No. of vertices
NE = 25 # No. of edges
NG = 5 # No. of unit graphs
#------------------------------------
# Generate random time-varying graph
#------------------------------------
Gt = gen_time_graph(NV, NE, NG)
# Snapshot index
k = 0
# for each unit graph
for Gk in Gt:
# Temporal adjacency matrix
Atk = adj_mtrx(Gk)
# Temporal weight matrix
# ...
# Temporal eigenvector centrality
# ...
k += 1
它运行完美。接下来,我尝试将每个矩阵操作分配给池中的一个工作人员:
#------------------------------------
# Constants
#------------------------------------
NV = 100 # No. of vertices
NE = 25 # No. of edges
NG = 5 # No. of unit graphs
NP = 2 # No. of processes
#------------------------------------
# Generate random time-varying graph
#------------------------------------
Gt = gen_time_graph(NV, NE, NG)
# Snapshot index
k = 0
if __name__ == '__main__':
with Pool(processes=NP) as pool:
# for each unit graph
for Gk in Gt:
# Temporal adjacency matrix
Atk = pool.apply_async( adj_mtrx, (Gk) ).get()
# Temporal weight matrix
# ...
# Temporal eigenvector centrality
# ...
k += 1
但是,这里的代码因以下错误而崩溃:
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/pool.py", line 125, in worker
result = (True, func(*args, **kwds))
TypeError: adj_mtrx() takes 1 positional argument but 100 were given
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "./aggr_vs_time_dat_par_mini.py", line 100, in <module>
Atk = pool.apply_async( adj_mtrx, (Gk) ).get()
File "/usr/lib/python3.8/multiprocessing/pool.py", line 771, in get
raise self._value
TypeError: adj_mtrx() takes 1 positional argument but 100 were given
我需要帮助调试问题。看起来,图 Gk
被 Pool
分解并作为一组顶点传递给函数。另外,如果您能对我的通用并行化方法(的适当性)发表评论,我将不胜感激 multiprocessing
.
Pool.apply_async()
您可以在下面找到最小工作示例的所有必要代码:
import networkx as nx
import random as rnd
import numpy as np
from multiprocessing import Pool
# Generates random graph
def gen_rnd_graph(nv, ne):
# Create random list of sources
Vsrc = [rnd.randint(0,nv-1) for iter in range(ne)]
# Create random list of sinks
Vsnk = [rnd.randint(0,nv-1) for iter in range(ne)]
# Create random list of edge weights
U = [rnd.random() for iter in range(ne)]
# Create list of tuples {Vsrc, Vsnk, U}
T = list(zip(Vsrc,Vsnk,U))
# Create graph
G = nx.Graph()
# Create list of vertices
V = list(range(nv))
# Add nodes to graph
G.add_nodes_from(V)
# Add edges between random vertices with random edge weights
G.add_weighted_edges_from(T)
return G
# Generates time-varying graph
def gen_time_graph(nv, ne, ng):
# Initialise list of graphs
l = []
for i in range(ng):
gi = gen_rnd_graph(nv, ne)
l.append(gi)
return l
# Computes adjacency matrix for snaphot of time-varying graph
def adj_mtrx(Gk):
# no. of vertices
n = Gk.number_of_nodes()
# adjacency matrix
Ak = np.zeros([n,n])
# for each vertex
for i in range(n):
for j in range(n):
if Gk.has_edge(i,j): Ak[i,j] = 1
return Ak
#------------------------------------
# Constants
#------------------------------------
NV = 100 # No. of vertices
NE = 25 # No. of edges
NG = 5 # No. of unit graphs
NP = 2 # No. of processes
#------------------------------------
# Generate random time-varying graph
#------------------------------------
Gt = gen_time_graph(NV, NE, NG)
# Snapshot index
k = 0
if __name__ == '__main__':
with Pool(processes=NP) as pool:
# for each unit graph
for Gk in Gt:
# Temporal adjacency matrix
Atk = pool.apply_async( adj_mtrx, (Gk) ).get()
k += 1
根据 apply_async
的文档,函数的签名是
apply_async(func[, args[, kwds[, callback[, error_callback]]]])
因此,您需要将 Gk
作为元组传递,即 (Gk,)
:
Atk = pool.apply_async( adj_mtrx, (Gk,) ).get()
背景
您的函数检索 *Gk
作为输入,结果是节点列表:
import networks as nx
g = nx.karate_club_graph()
print(*g)
# 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
1 和 0 长度的元组
有关创建 0 和 1 元素元组的更多详细信息:How to create a tuple with only one element or directly the section in the python documentation
基本上,您使用 ()
创建一个长度为 0
的元组,使用 (Gk,)
创建一个长度为 1 的元组,对于任何更大数量的元素,您可以使用 (x_1, ..., x_n)
或 (x_1, ..., x_n,)
.
*
-运算符
*
-运算符可用于使用任意数量的参数。请参阅 python documentation and section before. Similarly, you can use **
for arbitrary amount of keyword arguments. For more details, take a look at What does the star operator mean, in a function call? 以及此问题中列出的重复项。