将新对象放入 Dask 队列

Put new objects into Dask queue

我想使用 Dask 分布式队列与其他客户端共享一些数据。 我要分享的数据是一个 class 数据的对象: 其中索引是一个列表,数据是一个 dask 集合的未来,名称是一个字符串。

class data:
    index = list()
    data = None

    def __init__(self, name):
        self.name = name 

我使用 register_generic 注册这个要序列化的新对象,如下所示:

from distributed.protocol import register_generic 
register_generic(metadata)
ds = data(name)
ds.data = darray 
ds.index = index
q = Queue("data").put(ds)

但是我得到了这个错误:

TypeError: can not serialize 'data' object

是否有其他可用的方法允许将新对象放入队列? 否则你建议改用什么?

来自队列的文档字符串:

Elements of the Queue must be either Futures or msgpack-encodable data
(ints, strings, lists, dicts).  All data is sent through the scheduler so
it is wise not to send large objects.  To share large objects scatter the
data and share the future instead.

在不扩展 msgpack 来编码你的 class 或发送替代表示(例如,JSON)的情况下,你应该将你的值包装在 Dask 未来中,就像这样

f = client.scatter(ds)
q.put(ds)

获取值后,需要将其解包

value = client.gather(q.get())

请注意,队列是一个小众市场,在分布式系统中用处不大。