如何使用dask bag和delayed to join 2个映射函数?
How to use dask bag and delayed to join 2 mapping functions?
我有 2 个函数:find_components 和 processing_partition_component
import random
import dask.bag as db
def find_components(partition):
# it will return a list of components
return [x for x in range(1, random.randint(1,10))]
def processing_partition_component(part_comp):
print("processing %s" % part_comp)
partitions=['2','3','4']
我想在分区上计算 find_components(),然后获取每个分区的输出来为 processing_partition_component() 生成任务。并且计算不应等待所有 find_coponents() 完成。换句话说,processing_partition_component() 应该在 processing_partition 之一 finished.I 尝试过之后立即调用,但这不是我想要的:
db.from_sequence(partitions, partition_size=1).map(find_components).map(processing_partition_component).compute()
# Output:
processing [1, 2, 3, 4, 5]
processing [1, 2]
processing [1, 2, 3, 4, 5, 6, 7, 8, 9]
您可以看到 processing_partition_component() 将 find_components() 的整个输出作为示例输入:[1, 2, 3, 4, 5]。我想要的是任务应该在 find_components() 之后展开,每个 processing_partition_component() 应该只采用 1 个元素,如 1、2、3、4 或 5。预期的打印输出是
processing 1
processing 2
processing 3
....
processing 1 # from another output of find_components
...
如果是多线程的话,打印顺序会打乱,所以Processing 1可以连续打印3次
我不知道如何使用 dask.bag 和 dask.delayed 来做到这一点。我正在使用 python3
的最新 dask
谢谢,
Dask 包可以很好地处理发电机
def f(partition):
for x in partition:
yield x + 1
my_bag.map_partitions(f).map(print)
这将为每个元素加一,然后在移动到下一个元素之前打印它
我有 2 个函数:find_components 和 processing_partition_component
import random
import dask.bag as db
def find_components(partition):
# it will return a list of components
return [x for x in range(1, random.randint(1,10))]
def processing_partition_component(part_comp):
print("processing %s" % part_comp)
partitions=['2','3','4']
我想在分区上计算 find_components(),然后获取每个分区的输出来为 processing_partition_component() 生成任务。并且计算不应等待所有 find_coponents() 完成。换句话说,processing_partition_component() 应该在 processing_partition 之一 finished.I 尝试过之后立即调用,但这不是我想要的:
db.from_sequence(partitions, partition_size=1).map(find_components).map(processing_partition_component).compute()
# Output:
processing [1, 2, 3, 4, 5]
processing [1, 2]
processing [1, 2, 3, 4, 5, 6, 7, 8, 9]
您可以看到 processing_partition_component() 将 find_components() 的整个输出作为示例输入:[1, 2, 3, 4, 5]。我想要的是任务应该在 find_components() 之后展开,每个 processing_partition_component() 应该只采用 1 个元素,如 1、2、3、4 或 5。预期的打印输出是
processing 1
processing 2
processing 3
....
processing 1 # from another output of find_components
...
如果是多线程的话,打印顺序会打乱,所以Processing 1可以连续打印3次
我不知道如何使用 dask.bag 和 dask.delayed 来做到这一点。我正在使用 python3
的最新 dask谢谢,
Dask 包可以很好地处理发电机
def f(partition):
for x in partition:
yield x + 1
my_bag.map_partitions(f).map(print)
这将为每个元素加一,然后在移动到下一个元素之前打印它