如何使用dask bag和delayed to join 2个映射函数?

How to use dask bag and delayed to join 2 mapping functions?

我有 2 个函数:find_components 和 processing_partition_component

import random
import dask.bag as db

def find_components(partition):
  # it will return a list of components
  return [x for x in range(1, random.randint(1,10))]

def processing_partition_component(part_comp):
  print("processing %s" % part_comp)

partitions=['2','3','4']

我想在分区上计算 find_components(),然后获取每个分区的输出来为 processing_partition_component() 生成任务。并且计算不应等待所有 find_coponents() 完成。换句话说,processing_partition_component() 应该在 processing_partition 之一 finished.I 尝试过之后立即调用,但这不是我想要的:

db.from_sequence(partitions, partition_size=1).map(find_components).map(processing_partition_component).compute()
# Output:
processing [1, 2, 3, 4, 5]
processing [1, 2]
processing [1, 2, 3, 4, 5, 6, 7, 8, 9]

您可以看到 processing_partition_component() 将 find_components() 的整个输出作为示例输入:[1, 2, 3, 4, 5]。我想要的是任务应该在 find_components() 之后展开,每个 processing_partition_component() 应该只采用 1 个元素,如 1、2、3、4 或 5。预期的打印输出是

processing 1
processing 2
processing 3
....
processing 1  # from another output of find_components
...

如果是多线程的话,打印顺序会打乱,所以Processing 1可以连续打印3次

我不知道如何使用 dask.bag 和 dask.delayed 来做到这一点。我正在使用 python3

的最新 dask

谢谢,

Dask 包可以很好地处理发电机

def f(partition):
    for x in partition:
        yield x + 1

my_bag.map_partitions(f).map(print)

这将为每个元素加一,然后在移动到下一个元素之前打印它