从 dask workers 写入 redis

Writing to redis from dask workers

假设我有一个函数可以进行一些处理并将结果存储到 redis 服务器

r = redis.StrictRedis()

def process(data):
    (...do some work...)
    r.put(...)

现在我有一大堆数据,我想用dask来并行处理。类似于

from dask.distributed imoprt Client
client = Client()
for x in data:
    client.submit(process,x)

但我得到 KeyError(<function process>)。有什么想法吗?

编辑

它根据下面@mrocklin 的回答工作,将连接初始化放在函数内。我假设随着工作人员的来来去去,连接将被破坏并重新创建。如果我重写我的函数以接受一批数据,效率会更高吗?

def process(batches_data):
    r = redis.StrictRedis()
    for batch in batches_data:
        (...do some work...)
        r.put(...)

我的第一个猜测是您的对象 r 没有很好地序列化。这是相当典型的,因为具有实时连接的对象通常拒绝被序列化(有充分的理由)。

相反,您可以尝试在函数内建立连接

def process(data):
    r = redis.StrictRedis()
    ... do some work
    r.put(...)

此外,我建议您持有 submit 生产的期货。否则 Dask 将假设您不再关心这些任务并决定它可以忽略它们

futures = [client.submit(process, x) for x in L]
wait(futures)

如果这不能解决您的问题,那么我建议使用更完整的异常和回溯来编辑您的原始问题。