从 dask workers 写入 redis
Writing to redis from dask workers
假设我有一个函数可以进行一些处理并将结果存储到 redis 服务器
r = redis.StrictRedis()
def process(data):
(...do some work...)
r.put(...)
现在我有一大堆数据,我想用dask来并行处理。类似于
from dask.distributed imoprt Client
client = Client()
for x in data:
client.submit(process,x)
但我得到 KeyError(<function process>)
。有什么想法吗?
编辑
它根据下面@mrocklin 的回答工作,将连接初始化放在函数内。我假设随着工作人员的来来去去,连接将被破坏并重新创建。如果我重写我的函数以接受一批数据,效率会更高吗?
def process(batches_data):
r = redis.StrictRedis()
for batch in batches_data:
(...do some work...)
r.put(...)
我的第一个猜测是您的对象 r
没有很好地序列化。这是相当典型的,因为具有实时连接的对象通常拒绝被序列化(有充分的理由)。
相反,您可以尝试在函数内建立连接
def process(data):
r = redis.StrictRedis()
... do some work
r.put(...)
此外,我建议您持有 submit
生产的期货。否则 Dask 将假设您不再关心这些任务并决定它可以忽略它们
futures = [client.submit(process, x) for x in L]
wait(futures)
如果这不能解决您的问题,那么我建议使用更完整的异常和回溯来编辑您的原始问题。
假设我有一个函数可以进行一些处理并将结果存储到 redis 服务器
r = redis.StrictRedis()
def process(data):
(...do some work...)
r.put(...)
现在我有一大堆数据,我想用dask来并行处理。类似于
from dask.distributed imoprt Client
client = Client()
for x in data:
client.submit(process,x)
但我得到 KeyError(<function process>)
。有什么想法吗?
编辑
它根据下面@mrocklin 的回答工作,将连接初始化放在函数内。我假设随着工作人员的来来去去,连接将被破坏并重新创建。如果我重写我的函数以接受一批数据,效率会更高吗?
def process(batches_data):
r = redis.StrictRedis()
for batch in batches_data:
(...do some work...)
r.put(...)
我的第一个猜测是您的对象 r
没有很好地序列化。这是相当典型的,因为具有实时连接的对象通常拒绝被序列化(有充分的理由)。
相反,您可以尝试在函数内建立连接
def process(data):
r = redis.StrictRedis()
... do some work
r.put(...)
此外,我建议您持有 submit
生产的期货。否则 Dask 将假设您不再关心这些任务并决定它可以忽略它们
futures = [client.submit(process, x) for x in L]
wait(futures)
如果这不能解决您的问题,那么我建议使用更完整的异常和回溯来编辑您的原始问题。