并行化列表过滤
Parallelizing list filtering
我有一个项目列表,需要根据某些条件进行筛选。我想知道 Dask 是否可以并行执行此过滤,因为列表很长(几千万条记录)。
基本上,我需要做的是:
items = [
{'type': 'dog', 'weight': 10},
{'type': 'dog', 'weight': 20},
{'type': 'cat', 'weight': 15},
{'type': 'dog', 'weight': 30},
]
def item_is_valid(item):
item_is_valid = True
if item['type']=='cat':
item_is_valid = False
elif item['weight']>20:
item_is_valid = False
# ...
# elif for n conditions
return item_is_valid
items_filtered = [item for item in items if item_is_valid(item)]
使用 Dask,我实现了以下目标:
def item_is_valid_v2(item):
"""Return the whole item if valid."""
item_is_valid = True
if item['type']=='cat':
item_is_valid = False
elif item['weight']>20:
item_is_valid = False
# ...
# elif for n conditions
if item_is_valid:
return item
results = []
item = []
for item in items:
delayed = dask.delayed(item_is_valid)(item)
results.append(delayed)
results = dask.compute(*results)
但是,我得到的结果包含一些 None
值,然后需要以非并行方式以某种方式过滤掉这些值。
({'type': 'dog', 'weight': 10}, {'type': 'dog', 'weight': 20}, None, None)
也许 bag
API 对你有用,这是一个粗略的伪代码:
import dask.bag as db
bag = db.from_sequence() # or better yet read it from disk
result = bag.filter(item_is_valid) # note this uses the first version (bool)
要检查这是否有效,请检查 result.take(5)
的结果,如果结果令人满意:
computed_result = result.compute()
我有一个项目列表,需要根据某些条件进行筛选。我想知道 Dask 是否可以并行执行此过滤,因为列表很长(几千万条记录)。
基本上,我需要做的是:
items = [
{'type': 'dog', 'weight': 10},
{'type': 'dog', 'weight': 20},
{'type': 'cat', 'weight': 15},
{'type': 'dog', 'weight': 30},
]
def item_is_valid(item):
item_is_valid = True
if item['type']=='cat':
item_is_valid = False
elif item['weight']>20:
item_is_valid = False
# ...
# elif for n conditions
return item_is_valid
items_filtered = [item for item in items if item_is_valid(item)]
使用 Dask,我实现了以下目标:
def item_is_valid_v2(item):
"""Return the whole item if valid."""
item_is_valid = True
if item['type']=='cat':
item_is_valid = False
elif item['weight']>20:
item_is_valid = False
# ...
# elif for n conditions
if item_is_valid:
return item
results = []
item = []
for item in items:
delayed = dask.delayed(item_is_valid)(item)
results.append(delayed)
results = dask.compute(*results)
但是,我得到的结果包含一些 None
值,然后需要以非并行方式以某种方式过滤掉这些值。
({'type': 'dog', 'weight': 10}, {'type': 'dog', 'weight': 20}, None, None)
也许 bag
API 对你有用,这是一个粗略的伪代码:
import dask.bag as db
bag = db.from_sequence() # or better yet read it from disk
result = bag.filter(item_is_valid) # note this uses the first version (bool)
要检查这是否有效,请检查 result.take(5)
的结果,如果结果令人满意:
computed_result = result.compute()