将新对象放入 Dask 队列
Put new objects into Dask queue
我想使用 Dask 分布式队列与其他客户端共享一些数据。
我要分享的数据是一个 class 数据的对象:
其中索引是一个列表,数据是一个 dask 集合的未来,名称是一个字符串。
class data:
index = list()
data = None
def __init__(self, name):
self.name = name
我使用 register_generic 注册这个要序列化的新对象,如下所示:
from distributed.protocol import register_generic
register_generic(metadata)
ds = data(name)
ds.data = darray
ds.index = index
q = Queue("data").put(ds)
但是我得到了这个错误:
TypeError: can not serialize 'data' object
是否有其他可用的方法允许将新对象放入队列?
否则你建议改用什么?
来自队列的文档字符串:
Elements of the Queue must be either Futures or msgpack-encodable data
(ints, strings, lists, dicts). All data is sent through the scheduler so
it is wise not to send large objects. To share large objects scatter the
data and share the future instead.
在不扩展 msgpack 来编码你的 class 或发送替代表示(例如,JSON)的情况下,你应该将你的值包装在 Dask 未来中,就像这样
f = client.scatter(ds)
q.put(ds)
获取值后,需要将其解包
value = client.gather(q.get())
请注意,队列是一个小众市场,在分布式系统中用处不大。
我想使用 Dask 分布式队列与其他客户端共享一些数据。 我要分享的数据是一个 class 数据的对象: 其中索引是一个列表,数据是一个 dask 集合的未来,名称是一个字符串。
class data:
index = list()
data = None
def __init__(self, name):
self.name = name
我使用 register_generic 注册这个要序列化的新对象,如下所示:
from distributed.protocol import register_generic
register_generic(metadata)
ds = data(name)
ds.data = darray
ds.index = index
q = Queue("data").put(ds)
但是我得到了这个错误:
TypeError: can not serialize 'data' object
是否有其他可用的方法允许将新对象放入队列? 否则你建议改用什么?
来自队列的文档字符串:
Elements of the Queue must be either Futures or msgpack-encodable data (ints, strings, lists, dicts). All data is sent through the scheduler so it is wise not to send large objects. To share large objects scatter the data and share the future instead.
在不扩展 msgpack 来编码你的 class 或发送替代表示(例如,JSON)的情况下,你应该将你的值包装在 Dask 未来中,就像这样
f = client.scatter(ds)
q.put(ds)
获取值后,需要将其解包
value = client.gather(q.get())
请注意,队列是一个小众市场,在分布式系统中用处不大。