并行化 Python 字典理解

Parallelize Python Dictionary Comprehension

我正在尝试并行化 Python 字典的子集化。下面的代码根据 positions 字典中的键是否在列表 node_list:

中找到创建一个新字典 positions_sub
positions_sub = {}
for k,v in positions.items():
    if k in node_list:
        positions_sub[k] = v

这段代码运行良好,完全符合我的要求。但是,运行 需要一段时间,所以我正在尝试将其并行化。我试图在下面的代码中执行此操作,但它 returns positions_sub 作为字典列表,这不是我想要的。每个键的值数量也存在一些问题。任何想法如何让这个工作?谢谢!

from joblib import Parallel, delayed

def dict_filter(k,v):
    if k in node_list:
        positions_sub[k] = v
    return positions_sub
positions_sub = Parallel(n_jobs=-1,)(delayed(dict_filter)(k,v)for k,v in positions.items())

在求助于并行化之前,您应该确保为每个任务使用正确的数据结构:请记住 x in list 本质上是 O(n)x in set(还有 x in dict) 更像是 O(1)。因此,只需将 node_list 转换为 set 即可极大地提高性能。

node_list = set(node_list)
positions_sub = {}
for k,v in positions.items():
    if k in node_list:
        positions_sub[k] = v

另一件需要考虑的事情是 len(positions)len(node_list) 之间的比率。如果一个比另一个小得多,您应该始终迭代较小的那个。


编辑:一些用于性能比较的代码

import random
import timeit
import functools

def generate(n_positions=1000, n_node_list=100):
    positions = { i:i for i in random.sample(range(n_positions), n_positions) }
    node_list = random.sample(range(max(n_positions, n_node_list)), n_node_list)
    return positions, node_list  

def validate(variant):
    data = generate(1000, 100)
    if sorted(data[1]) != sorted(k for k in variant(*data)):
        raise Exception(f"{variant.__name__} failed")

def measure(variant, data, repeats=1000):
    total_seconds = timeit.Timer(functools.partial(variant, *data)).timeit(repeats)
    average_ms = total_seconds / repeats * 1000
    print(f"{variant.__name__:10s} took an average of {average_ms:0.2f}ms per pass over {repeats} passes" )   


def variant1(positions, node_list):
    positions_sub = {}
    for k,v in positions.items():
        if k in node_list:
            positions_sub[k] = v
    return positions_sub

def variant1b(positions, node_list):
    node_list = set(node_list)
    positions_sub = {}
    for k,v in positions.items():
        if k in node_list:
            positions_sub[k] = v
    return positions_sub

def variant2(positions, node_list):
    return {k:v for k,v in positions.items() if k in node_list}

def variant2b(positions, node_list):
    node_list = set(node_list)
    return {k:v for k,v in positions.items() if k in node_list}

def variant3(positions, node_list):
    return {k:positions[k] for k in node_list if k in positions}



if __name__ == "__main__":
    variants = [variant1,variant1b,variant2,variant2b,variant3]
    for variant in variants:
        validate(variant)      

    n_positions = 4000
    n_node_list = 1000
    n_repeats = 100
    data = generate(n_node_list, n_node_list)
    print(f"data generated with len(positions)={n_positions} and len(node_list)={n_node_list}")
    for variant in variants:
        measure(variant, data, n_repeats)

EDIT2: 根据要求,这里是我机器上的一些结果

first run:
data generated with len(positions)=4000 and len(node_list)=1000
variant1   took an average of 6.90ms per pass over 100 passes
variant1b  took an average of 0.22ms per pass over 100 passes
variant2   took an average of 6.95ms per pass over 100 passes
variant2b  took an average of 0.12ms per pass over 100 passes
variant3   took an average of 0.19ms per pass over 100 passes

second run:
data generated with len(positions)=40000 and len(node_list)=10000
variant1   took an average of 738.23ms per pass over 10 passes
variant1b  took an average of   2.04ms per pass over 10 passes
variant2   took an average of 739.51ms per pass over 10 passes
variant2b  took an average of   1.52ms per pass over 10 passes
variant3   took an average of   1.85ms per pass over 10 passes

请注意,n=len(positions)m=len(node_list) 已被选中,因此比率 n/m=4 大致等于原始数据的比率,该原始数据已被 OP 指定为 1.2M nm.

的 300K

观察从第一个到第二个放大 10 倍的效果 运行:在第一个 运行 中,variant1b 比 variant1 快大约 31 倍,在第二个中 运行 快了361倍!这是将 k in node_list 的复杂度从 O(m) 降低到 O(1) 的预期结果。 variant1 的总时间复杂度为 n*m = 0.25*n^2 = O(n^2) 而 variant1b 只有 n*1 = O(n)。这意味着对于 n 增加的每个数量级,variant1b 也比 variant1 快一个数量级。

仅仅通过并行化就可以实现类似的性能改进是相当值得怀疑的,因为大体上,一个令人尴尬的可并行化问题的预期性能增益是可用 CPU 的倍数,这仍然是一个常数因子,而且还很远将算法从 O(n^2) 改进为 O(n) 的增益。

此外,恕我直言,虽然给定的问题属于可并行化问题的 class,但输出必须在并行处理后聚合才能使用。此外,我对 joblib 非常不熟悉,这就是为什么我没有将它添加到比较中。

您可以使用 asyncio。 (可以在 [此处][1] 找到文档)。它用作多个 Python 异步框架的基础,这些框架提供 high-performance 网络和 web-servers、数据库连接库、分布式任务队列等。此外,它还具有 high-level 和low-level API 可解决任何类型的问题。

import asyncio

def background(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)

    return wrapped

@background
def your_function(argument):
    #code

现在这个函数将运行 并行调用而无需将主程序置于等待状态。您也可以使用它来并行化 for 循环。当调用 for 循环时,尽管循环是顺序的,但每次迭代 运行 解释器一到达主程序就与主程序并行。

对于您的具体情况,您可以这样做:

import asyncio
import time


def background(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)                                   
    return wrapped
    
@background
def add_to_dictionary(k,v):
    time.sleep(1) # Added Sleep to better demonstrate parallelization
    print(f"function called for {k=}\n", end='')
    if k in node_list:
        positions_sub[k] = v

# Random data to demonstrate parallelization
positions = {i:i for i in range(20)}
node_list = [key for key in positions if not key%3 or not key%5]
print(f"{positions=}, {node_list=}")

positions_sub = dict()

loop = asyncio.get_event_loop() # Have a new event loop

looper = asyncio.gather(*[add_to_dictionary(k,v) for k, v in positions.items()])  
# Run the loop
                             
results = loop.run_until_complete(looper) # Wait until finish


print('loop finished')
print(f"{positions_sub=}")

这会产生以下输出:

positions={0: 0, 1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 7: 7, 8: 8, 9: 9, 10: 10, 11: 11, 12: 12, 13: 13, 14: 14, 15: 15, 16: 16, 17: 17, 18: 18, 19: 19},
node_list=[0, 3, 5, 6, 9, 10, 12, 15, 18]
function called for k=0
function called for k=6
function called for k=5
function called for k=4
function called for k=2
function called for k=1
function called for k=3
function called for k=7
function called for k=11
function called for k=10
function called for k=8
function called for k=15
function called for k=14
function called for k=12
function called for k=9
function called for k=13
function called for k=19
function called for k=18
function called for k=17
function called for k=16
loop finished
positions_sub={3: 3, 6: 6, 5: 5, 0: 0, 10: 10, 15: 15, 9: 9, 12: 12, 18: 18}