将部分结果存储在 dask 延迟迭代中
Store partial results in dask delayed iterations
我有一个当前的长迭代过程,其中我 运行 计算并且每 x
迭代我将结果存储到数据库中。
例如,在 range(20)
上迭代 fun()
函数并使用 save_results
保存每 5 个结果:
import time
def fun(x):
time.sleep(0.1*x)
return(0.1*x)
def save_results(result):
# originally stores the new data to DB
print result # print as example
result = []
for i in range(20):
result.append(fun(i))
if i%5==4:
save_results(result[-5:])
我想使用 dask
delayed
和 compute
方法并行处理该过程。但是如果我 运行 它像下面的例子一样, store_results
出现在 compute
:
之前
import dask as da
result = []
for i in range(20):
result.append(da.delayed(fun)(i))
if i%5==4:
save_results(result[-5:])
result = da.compute(result)[0]
因此,我不是每 5 次迭代存储一次结果,而是存储一个延迟对象列表:
[Delayed('fun-202f7e28-e594-4926-a5cd-5931dbc99d6b'),
Delayed('fun-d2bf2bc9-a4f3-46d7-adb7-84114a68b482'),
Delayed('fun-c34f2c04-3e25-47fa-8165-1ee7c786aaf6'),
Delayed('fun-a4edd3fc-442d-4ec1-8a0e-320bd9315a61'),
Delayed('fun-c7b48e2c-cb66-472e-85c5-fe6c595fa1ec')]
如何解决这个问题,并将每 5 个新结果存储到数据库中?
您应该延迟对延迟对象进行操作的任何函数调用
顺序码
result = []
for i in range(20):
result.append(fun(i))
if i%5==4:
save_results(result[-5:])
并行代码
def fun(x):
...
result = []
side_effects = []
for i in range(20):
result = dask.delayed(fun)(i)
results.append(result)
if i%5==4:
value = dask.delayed(save_results)(result[-5:])
side_effects.append(value)
dask.compute(results + side_effects)
我有一个当前的长迭代过程,其中我 运行 计算并且每 x
迭代我将结果存储到数据库中。
例如,在 range(20)
上迭代 fun()
函数并使用 save_results
保存每 5 个结果:
import time
def fun(x):
time.sleep(0.1*x)
return(0.1*x)
def save_results(result):
# originally stores the new data to DB
print result # print as example
result = []
for i in range(20):
result.append(fun(i))
if i%5==4:
save_results(result[-5:])
我想使用 dask
delayed
和 compute
方法并行处理该过程。但是如果我 运行 它像下面的例子一样, store_results
出现在 compute
:
import dask as da
result = []
for i in range(20):
result.append(da.delayed(fun)(i))
if i%5==4:
save_results(result[-5:])
result = da.compute(result)[0]
因此,我不是每 5 次迭代存储一次结果,而是存储一个延迟对象列表:
[Delayed('fun-202f7e28-e594-4926-a5cd-5931dbc99d6b'), Delayed('fun-d2bf2bc9-a4f3-46d7-adb7-84114a68b482'), Delayed('fun-c34f2c04-3e25-47fa-8165-1ee7c786aaf6'), Delayed('fun-a4edd3fc-442d-4ec1-8a0e-320bd9315a61'), Delayed('fun-c7b48e2c-cb66-472e-85c5-fe6c595fa1ec')]
如何解决这个问题,并将每 5 个新结果存储到数据库中?
您应该延迟对延迟对象进行操作的任何函数调用
顺序码
result = []
for i in range(20):
result.append(fun(i))
if i%5==4:
save_results(result[-5:])
并行代码
def fun(x):
...
result = []
side_effects = []
for i in range(20):
result = dask.delayed(fun)(i)
results.append(result)
if i%5==4:
value = dask.delayed(save_results)(result[-5:])
side_effects.append(value)
dask.compute(results + side_effects)