运行 使用 Dask 对单条数据进行令人尴尬的并行操作

Running Embarrasingly Parallel operations on a single piece of data using Dask

我正在学习 this 教程,我能够并行化一个 for 循环,其中操作是在多个文件上独立完成的。但是,现在我需要执行迭代函数以将变量从单个 xarray 数据集(来源)提取到 300 个文件中。我的函数如下所示:

def build_data(parameters)
    extract var1 from source
    extract var2 from source
    extract var3 from source
    ..........
    return('finished')

我已经设置了 6 个工人并且 运行 这个函数循环了 300 次,我实现了 :

source = xr.opendataset()
build_delayed = []
for i in range (0,300):
   task = dask.delayed (build_data) (params)
   build_delayed.append(task)
dask.compute(*build_delayed)

但这不起作用,而且似乎延迟正在增加,因为所有工作人员都在尝试访问同一块数据。此外,当循环结束时,None 在 build_delayed[ ].

中返回

我在这里错过了什么?在这种情况下如何让它并行工作?

(我的详细question.)

编辑:MVCE:

stations = []
for key in valid_stations:
    station = pd.read_csv('~/projects/LIS_verification/pyLISF/data/IFI-Observations/flow_2/'+key+'.csv')
    station.reset_index(drop = True,inplace=True)
    station.set_index('Date',inplace=True)
    station.index = pd.to_datetime(station.index)
    stations.append(station)

routedat = xr.opendataset('/LDAS/02-ILDAS/OUTPUT/EXP002/SURFACEMODEL/file1.nc')
    
def build_data(stations,routedat,i) :
    try :
        start_date = stations[i]['Streamflow (cumecs)'].first_valid_index()
        lis_date = routedat['time'].values
        return (start_date,lis_date)
    except Exception as e :
        return( start_date,str(e))
    
for i in range (0,10):
    task = dask.delayed (build_data) (stations,routedat,i)
    build_delayed.append(task)

dask.compute(*build_delayed)

我得到了所需的输出,但与顺序循环相比花费的时间太大(508 毫秒对 31 秒)。

更新:我能够通过使用命令 .compute(scheduler='synchronous')

在 < 300 毫秒内并行地 运行 它成功

这只是一个猜测,您是否尝试创建一个 reduce 函数并对其进行计算?

类似于:

@dask.delayed
def delayed_reduce(array):
    out = ''
    for entry in array:
        print (entry)
        out = out + str(entry) + ' '
    return out

build_delayed = []
for i in range (0,300):
   task = dask.delayed (build_data) (params)
   build_delayed.append(task)
out = delayed_reduce(build_delayed) 
value = out.compute()

这只是代码的框架,但您明白了。如果您想调试正在发生的事情,请添加一个不同的 return 值并将其打印到某个文件。

另外,你没有提到你是如何 运行 昏昏欲睡的。你在使用分布式吗?如果这样做,请确保您已初始化工作程序和调度程序。

希望这个框架代码能帮助您定位问题。