并行化列表过滤

Parallelizing list filtering

我有一个项目列表,需要根据某些条件进行筛选。我想知道 Dask 是否可以并行执行此过滤,因为列表很长(几千万条记录)。

基本上,我需要做的是:

items = [
    {'type': 'dog', 'weight': 10},
    {'type': 'dog', 'weight': 20},
    {'type': 'cat', 'weight': 15},
    {'type': 'dog', 'weight': 30},
]

def item_is_valid(item):
    item_is_valid = True

    if item['type']=='cat':
        item_is_valid = False
    elif item['weight']>20:
        item_is_valid = False
    # ...
    # elif for n conditions

    return item_is_valid

items_filtered = [item for item in items if item_is_valid(item)]

使用 Dask,我实现了以下目标:

def item_is_valid_v2(item):
    """Return the whole item if valid."""
    item_is_valid = True

    if item['type']=='cat':
        item_is_valid = False
    elif item['weight']>20:
        item_is_valid = False
    # ...
    # elif for n conditions
    
    if item_is_valid:
        return item

results = []
item = []
for item in items:
    delayed = dask.delayed(item_is_valid)(item)
    results.append(delayed)

results = dask.compute(*results)

但是,我得到的结果包含一些 None 值,然后需要以非并行方式以某种方式过滤掉这些值。

({'type': 'dog', 'weight': 10}, {'type': 'dog', 'weight': 20}, None, None)

也许 bag API 对你有用,这是一个粗略的伪代码:

import dask.bag as db

bag = db.from_sequence() # or better yet read it from disk
result = bag.filter(item_is_valid) # note this uses the first version (bool)

要检查这是否有效,请检查 result.take(5) 的结果,如果结果令人满意:

computed_result = result.compute()