在 dask.bag 内改变 objects

mutate objects inside a dask.bag

我正在尝试使用 dask.bag 来保存给定 class 的 object,其中每个实例都捕获文档的各种属性(标题、字数等)。

这个 object 有一些关联的方法可以设置 object 的不同属性。

例如:

import dask.bag as db

class Item:    
    def __init__(self, value):
        self.value = 'My value is: "{}"'.format(value)        
    def modify(self):
        self.value = 'My value used to be: "{}"'.format(self.value)

def generateItems():
    i = 1
    while i <= 100:
        yield(Item(i))
        i += 1

b = db.from_sequence(generateItems())
# looks like:
b.take(1)[0].value #'My value is: "1"'

如何在第一个包 (b) 中为每个 modify-d 个实例创建一个包?

期望输出:'My value used to be: "My value is: "1""'

我试过了:

c = b.map(lambda x: x.modify() )

c.take(1)[0].value 
#AttributeError: 'NoneType' object has no attribute 'value'

# Also tried:
d = b.map(lambda x: x[0].modify() )    
b.take(1) # TypeError: 'Item' object does not support indexing

这里的问题是,c 获取 运行 您的 lambda 函数的结果,而 Item.modify() 没有输出。通常在 Dask 中,您期望 return 基于输入的新对象,而不是改变现有对象 How does dask.delayed handle mutable inputs? - 考虑如果多个任务在多个线程或多个进程中对同一对象进行操作会发生什么。

在这种最简单的情况下,您可以通过在 mutate() 末尾添加 return self 或将 lambda 表达式更改为 x.mutate() and x 来获得您想要的结果;但不要以这种方式编程,而是创建一个具有所需新值的新对象。