并行化 Python 字典理解
Parallelize Python Dictionary Comprehension
我正在尝试并行化 Python 字典的子集化。下面的代码根据 positions
字典中的键是否在列表 node_list
:
中找到创建一个新字典 positions_sub
positions_sub = {}
for k,v in positions.items():
if k in node_list:
positions_sub[k] = v
这段代码运行良好,完全符合我的要求。但是,运行 需要一段时间,所以我正在尝试将其并行化。我试图在下面的代码中执行此操作,但它 returns positions_sub
作为字典列表,这不是我想要的。每个键的值数量也存在一些问题。任何想法如何让这个工作?谢谢!
from joblib import Parallel, delayed
def dict_filter(k,v):
if k in node_list:
positions_sub[k] = v
return positions_sub
positions_sub = Parallel(n_jobs=-1,)(delayed(dict_filter)(k,v)for k,v in positions.items())
在求助于并行化之前,您应该确保为每个任务使用正确的数据结构:请记住 x in list
本质上是 O(n)
而 x in set
(还有 x in dict
) 更像是 O(1)
。因此,只需将 node_list
转换为 set
即可极大地提高性能。
node_list = set(node_list)
positions_sub = {}
for k,v in positions.items():
if k in node_list:
positions_sub[k] = v
另一件需要考虑的事情是 len(positions)
和 len(node_list)
之间的比率。如果一个比另一个小得多,您应该始终迭代较小的那个。
编辑:一些用于性能比较的代码
import random
import timeit
import functools
def generate(n_positions=1000, n_node_list=100):
positions = { i:i for i in random.sample(range(n_positions), n_positions) }
node_list = random.sample(range(max(n_positions, n_node_list)), n_node_list)
return positions, node_list
def validate(variant):
data = generate(1000, 100)
if sorted(data[1]) != sorted(k for k in variant(*data)):
raise Exception(f"{variant.__name__} failed")
def measure(variant, data, repeats=1000):
total_seconds = timeit.Timer(functools.partial(variant, *data)).timeit(repeats)
average_ms = total_seconds / repeats * 1000
print(f"{variant.__name__:10s} took an average of {average_ms:0.2f}ms per pass over {repeats} passes" )
def variant1(positions, node_list):
positions_sub = {}
for k,v in positions.items():
if k in node_list:
positions_sub[k] = v
return positions_sub
def variant1b(positions, node_list):
node_list = set(node_list)
positions_sub = {}
for k,v in positions.items():
if k in node_list:
positions_sub[k] = v
return positions_sub
def variant2(positions, node_list):
return {k:v for k,v in positions.items() if k in node_list}
def variant2b(positions, node_list):
node_list = set(node_list)
return {k:v for k,v in positions.items() if k in node_list}
def variant3(positions, node_list):
return {k:positions[k] for k in node_list if k in positions}
if __name__ == "__main__":
variants = [variant1,variant1b,variant2,variant2b,variant3]
for variant in variants:
validate(variant)
n_positions = 4000
n_node_list = 1000
n_repeats = 100
data = generate(n_node_list, n_node_list)
print(f"data generated with len(positions)={n_positions} and len(node_list)={n_node_list}")
for variant in variants:
measure(variant, data, n_repeats)
EDIT2: 根据要求,这里是我机器上的一些结果
first run:
data generated with len(positions)=4000 and len(node_list)=1000
variant1 took an average of 6.90ms per pass over 100 passes
variant1b took an average of 0.22ms per pass over 100 passes
variant2 took an average of 6.95ms per pass over 100 passes
variant2b took an average of 0.12ms per pass over 100 passes
variant3 took an average of 0.19ms per pass over 100 passes
second run:
data generated with len(positions)=40000 and len(node_list)=10000
variant1 took an average of 738.23ms per pass over 10 passes
variant1b took an average of 2.04ms per pass over 10 passes
variant2 took an average of 739.51ms per pass over 10 passes
variant2b took an average of 1.52ms per pass over 10 passes
variant3 took an average of 1.85ms per pass over 10 passes
请注意,n=len(positions)
和 m=len(node_list)
已被选中,因此比率 n/m=4
大致等于原始数据的比率,该原始数据已被 OP 指定为 1.2M n
和 m
.
的 300K
观察从第一个到第二个放大 10 倍的效果 运行:在第一个 运行 中,variant1b 比 variant1 快大约 31 倍,在第二个中 运行 快了361倍!这是将 k in node_list
的复杂度从 O(m) 降低到 O(1) 的预期结果。 variant1 的总时间复杂度为 n*m = 0.25*n^2 = O(n^2) 而 variant1b 只有 n*1 = O(n)。这意味着对于 n 增加的每个数量级,variant1b 也比 variant1 快一个数量级。
仅仅通过并行化就可以实现类似的性能改进是相当值得怀疑的,因为大体上,一个令人尴尬的可并行化问题的预期性能增益是可用 CPU 的倍数,这仍然是一个常数因子,而且还很远将算法从 O(n^2) 改进为 O(n) 的增益。
此外,恕我直言,虽然给定的问题属于可并行化问题的 class,但输出必须在并行处理后聚合才能使用。此外,我对 joblib 非常不熟悉,这就是为什么我没有将它添加到比较中。
您可以使用 asyncio。 (可以在 [此处][1] 找到文档)。它用作多个 Python 异步框架的基础,这些框架提供 high-performance 网络和 web-servers、数据库连接库、分布式任务队列等。此外,它还具有 high-level 和low-level API 可解决任何类型的问题。
import asyncio
def background(f):
def wrapped(*args, **kwargs):
return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)
return wrapped
@background
def your_function(argument):
#code
现在这个函数将运行 并行调用而无需将主程序置于等待状态。您也可以使用它来并行化 for 循环。当调用 for 循环时,尽管循环是顺序的,但每次迭代 运行 解释器一到达主程序就与主程序并行。
对于您的具体情况,您可以这样做:
import asyncio
import time
def background(f):
def wrapped(*args, **kwargs):
return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)
return wrapped
@background
def add_to_dictionary(k,v):
time.sleep(1) # Added Sleep to better demonstrate parallelization
print(f"function called for {k=}\n", end='')
if k in node_list:
positions_sub[k] = v
# Random data to demonstrate parallelization
positions = {i:i for i in range(20)}
node_list = [key for key in positions if not key%3 or not key%5]
print(f"{positions=}, {node_list=}")
positions_sub = dict()
loop = asyncio.get_event_loop() # Have a new event loop
looper = asyncio.gather(*[add_to_dictionary(k,v) for k, v in positions.items()])
# Run the loop
results = loop.run_until_complete(looper) # Wait until finish
print('loop finished')
print(f"{positions_sub=}")
这会产生以下输出:
positions={0: 0, 1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 7: 7, 8: 8, 9: 9, 10: 10, 11: 11, 12: 12, 13: 13, 14: 14, 15: 15, 16: 16, 17: 17, 18: 18, 19: 19},
node_list=[0, 3, 5, 6, 9, 10, 12, 15, 18]
function called for k=0
function called for k=6
function called for k=5
function called for k=4
function called for k=2
function called for k=1
function called for k=3
function called for k=7
function called for k=11
function called for k=10
function called for k=8
function called for k=15
function called for k=14
function called for k=12
function called for k=9
function called for k=13
function called for k=19
function called for k=18
function called for k=17
function called for k=16
loop finished
positions_sub={3: 3, 6: 6, 5: 5, 0: 0, 10: 10, 15: 15, 9: 9, 12: 12, 18: 18}
我正在尝试并行化 Python 字典的子集化。下面的代码根据 positions
字典中的键是否在列表 node_list
:
positions_sub
positions_sub = {}
for k,v in positions.items():
if k in node_list:
positions_sub[k] = v
这段代码运行良好,完全符合我的要求。但是,运行 需要一段时间,所以我正在尝试将其并行化。我试图在下面的代码中执行此操作,但它 returns positions_sub
作为字典列表,这不是我想要的。每个键的值数量也存在一些问题。任何想法如何让这个工作?谢谢!
from joblib import Parallel, delayed
def dict_filter(k,v):
if k in node_list:
positions_sub[k] = v
return positions_sub
positions_sub = Parallel(n_jobs=-1,)(delayed(dict_filter)(k,v)for k,v in positions.items())
在求助于并行化之前,您应该确保为每个任务使用正确的数据结构:请记住 x in list
本质上是 O(n)
而 x in set
(还有 x in dict
) 更像是 O(1)
。因此,只需将 node_list
转换为 set
即可极大地提高性能。
node_list = set(node_list)
positions_sub = {}
for k,v in positions.items():
if k in node_list:
positions_sub[k] = v
另一件需要考虑的事情是 len(positions)
和 len(node_list)
之间的比率。如果一个比另一个小得多,您应该始终迭代较小的那个。
编辑:一些用于性能比较的代码
import random
import timeit
import functools
def generate(n_positions=1000, n_node_list=100):
positions = { i:i for i in random.sample(range(n_positions), n_positions) }
node_list = random.sample(range(max(n_positions, n_node_list)), n_node_list)
return positions, node_list
def validate(variant):
data = generate(1000, 100)
if sorted(data[1]) != sorted(k for k in variant(*data)):
raise Exception(f"{variant.__name__} failed")
def measure(variant, data, repeats=1000):
total_seconds = timeit.Timer(functools.partial(variant, *data)).timeit(repeats)
average_ms = total_seconds / repeats * 1000
print(f"{variant.__name__:10s} took an average of {average_ms:0.2f}ms per pass over {repeats} passes" )
def variant1(positions, node_list):
positions_sub = {}
for k,v in positions.items():
if k in node_list:
positions_sub[k] = v
return positions_sub
def variant1b(positions, node_list):
node_list = set(node_list)
positions_sub = {}
for k,v in positions.items():
if k in node_list:
positions_sub[k] = v
return positions_sub
def variant2(positions, node_list):
return {k:v for k,v in positions.items() if k in node_list}
def variant2b(positions, node_list):
node_list = set(node_list)
return {k:v for k,v in positions.items() if k in node_list}
def variant3(positions, node_list):
return {k:positions[k] for k in node_list if k in positions}
if __name__ == "__main__":
variants = [variant1,variant1b,variant2,variant2b,variant3]
for variant in variants:
validate(variant)
n_positions = 4000
n_node_list = 1000
n_repeats = 100
data = generate(n_node_list, n_node_list)
print(f"data generated with len(positions)={n_positions} and len(node_list)={n_node_list}")
for variant in variants:
measure(variant, data, n_repeats)
EDIT2: 根据要求,这里是我机器上的一些结果
first run:
data generated with len(positions)=4000 and len(node_list)=1000
variant1 took an average of 6.90ms per pass over 100 passes
variant1b took an average of 0.22ms per pass over 100 passes
variant2 took an average of 6.95ms per pass over 100 passes
variant2b took an average of 0.12ms per pass over 100 passes
variant3 took an average of 0.19ms per pass over 100 passes
second run:
data generated with len(positions)=40000 and len(node_list)=10000
variant1 took an average of 738.23ms per pass over 10 passes
variant1b took an average of 2.04ms per pass over 10 passes
variant2 took an average of 739.51ms per pass over 10 passes
variant2b took an average of 1.52ms per pass over 10 passes
variant3 took an average of 1.85ms per pass over 10 passes
请注意,n=len(positions)
和 m=len(node_list)
已被选中,因此比率 n/m=4
大致等于原始数据的比率,该原始数据已被 OP 指定为 1.2M n
和 m
.
观察从第一个到第二个放大 10 倍的效果 运行:在第一个 运行 中,variant1b 比 variant1 快大约 31 倍,在第二个中 运行 快了361倍!这是将 k in node_list
的复杂度从 O(m) 降低到 O(1) 的预期结果。 variant1 的总时间复杂度为 n*m = 0.25*n^2 = O(n^2) 而 variant1b 只有 n*1 = O(n)。这意味着对于 n 增加的每个数量级,variant1b 也比 variant1 快一个数量级。
仅仅通过并行化就可以实现类似的性能改进是相当值得怀疑的,因为大体上,一个令人尴尬的可并行化问题的预期性能增益是可用 CPU 的倍数,这仍然是一个常数因子,而且还很远将算法从 O(n^2) 改进为 O(n) 的增益。
此外,恕我直言,虽然给定的问题属于可并行化问题的 class,但输出必须在并行处理后聚合才能使用。此外,我对 joblib 非常不熟悉,这就是为什么我没有将它添加到比较中。
您可以使用 asyncio。 (可以在 [此处][1] 找到文档)。它用作多个 Python 异步框架的基础,这些框架提供 high-performance 网络和 web-servers、数据库连接库、分布式任务队列等。此外,它还具有 high-level 和low-level API 可解决任何类型的问题。
import asyncio
def background(f):
def wrapped(*args, **kwargs):
return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)
return wrapped
@background
def your_function(argument):
#code
现在这个函数将运行 并行调用而无需将主程序置于等待状态。您也可以使用它来并行化 for 循环。当调用 for 循环时,尽管循环是顺序的,但每次迭代 运行 解释器一到达主程序就与主程序并行。
对于您的具体情况,您可以这样做:
import asyncio
import time
def background(f):
def wrapped(*args, **kwargs):
return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)
return wrapped
@background
def add_to_dictionary(k,v):
time.sleep(1) # Added Sleep to better demonstrate parallelization
print(f"function called for {k=}\n", end='')
if k in node_list:
positions_sub[k] = v
# Random data to demonstrate parallelization
positions = {i:i for i in range(20)}
node_list = [key for key in positions if not key%3 or not key%5]
print(f"{positions=}, {node_list=}")
positions_sub = dict()
loop = asyncio.get_event_loop() # Have a new event loop
looper = asyncio.gather(*[add_to_dictionary(k,v) for k, v in positions.items()])
# Run the loop
results = loop.run_until_complete(looper) # Wait until finish
print('loop finished')
print(f"{positions_sub=}")
这会产生以下输出:
positions={0: 0, 1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 7: 7, 8: 8, 9: 9, 10: 10, 11: 11, 12: 12, 13: 13, 14: 14, 15: 15, 16: 16, 17: 17, 18: 18, 19: 19},
node_list=[0, 3, 5, 6, 9, 10, 12, 15, 18]
function called for k=0
function called for k=6
function called for k=5
function called for k=4
function called for k=2
function called for k=1
function called for k=3
function called for k=7
function called for k=11
function called for k=10
function called for k=8
function called for k=15
function called for k=14
function called for k=12
function called for k=9
function called for k=13
function called for k=19
function called for k=18
function called for k=17
function called for k=16
loop finished
positions_sub={3: 3, 6: 6, 5: 5, 0: 0, 10: 10, 15: 15, 9: 9, 12: 12, 18: 18}