运行 使用 Dask 对单条数据进行令人尴尬的并行操作
Running Embarrasingly Parallel operations on a single piece of data using Dask
我正在学习 this 教程,我能够并行化一个 for 循环,其中操作是在多个文件上独立完成的。但是,现在我需要执行迭代函数以将变量从单个 xarray 数据集(来源)提取到 300 个文件中。我的函数如下所示:
def build_data(parameters)
extract var1 from source
extract var2 from source
extract var3 from source
..........
return('finished')
我已经设置了 6 个工人并且 运行 这个函数循环了 300 次,我实现了 :
source = xr.opendataset()
build_delayed = []
for i in range (0,300):
task = dask.delayed (build_data) (params)
build_delayed.append(task)
dask.compute(*build_delayed)
但这不起作用,而且似乎延迟正在增加,因为所有工作人员都在尝试访问同一块数据。此外,当循环结束时,None
在 build_delayed[ ].
中返回
我在这里错过了什么?在这种情况下如何让它并行工作?
(我的详细question.)
编辑:MVCE:
stations = []
for key in valid_stations:
station = pd.read_csv('~/projects/LIS_verification/pyLISF/data/IFI-Observations/flow_2/'+key+'.csv')
station.reset_index(drop = True,inplace=True)
station.set_index('Date',inplace=True)
station.index = pd.to_datetime(station.index)
stations.append(station)
routedat = xr.opendataset('/LDAS/02-ILDAS/OUTPUT/EXP002/SURFACEMODEL/file1.nc')
def build_data(stations,routedat,i) :
try :
start_date = stations[i]['Streamflow (cumecs)'].first_valid_index()
lis_date = routedat['time'].values
return (start_date,lis_date)
except Exception as e :
return( start_date,str(e))
for i in range (0,10):
task = dask.delayed (build_data) (stations,routedat,i)
build_delayed.append(task)
dask.compute(*build_delayed)
我得到了所需的输出,但与顺序循环相比花费的时间太大(508 毫秒对 31 秒)。
更新:我能够通过使用命令 .compute(scheduler='synchronous')
在 < 300 毫秒内并行地 运行 它成功
这只是一个猜测,您是否尝试创建一个 reduce 函数并对其进行计算?
类似于:
@dask.delayed
def delayed_reduce(array):
out = ''
for entry in array:
print (entry)
out = out + str(entry) + ' '
return out
build_delayed = []
for i in range (0,300):
task = dask.delayed (build_data) (params)
build_delayed.append(task)
out = delayed_reduce(build_delayed)
value = out.compute()
这只是代码的框架,但您明白了。如果您想调试正在发生的事情,请添加一个不同的 return 值并将其打印到某个文件。
另外,你没有提到你是如何 运行 昏昏欲睡的。你在使用分布式吗?如果这样做,请确保您已初始化工作程序和调度程序。
希望这个框架代码能帮助您定位问题。
我正在学习 this 教程,我能够并行化一个 for 循环,其中操作是在多个文件上独立完成的。但是,现在我需要执行迭代函数以将变量从单个 xarray 数据集(来源)提取到 300 个文件中。我的函数如下所示:
def build_data(parameters)
extract var1 from source
extract var2 from source
extract var3 from source
..........
return('finished')
我已经设置了 6 个工人并且 运行 这个函数循环了 300 次,我实现了 :
source = xr.opendataset()
build_delayed = []
for i in range (0,300):
task = dask.delayed (build_data) (params)
build_delayed.append(task)
dask.compute(*build_delayed)
但这不起作用,而且似乎延迟正在增加,因为所有工作人员都在尝试访问同一块数据。此外,当循环结束时,None
在 build_delayed[ ].
我在这里错过了什么?在这种情况下如何让它并行工作?
(我的详细question.)
编辑:MVCE:
stations = []
for key in valid_stations:
station = pd.read_csv('~/projects/LIS_verification/pyLISF/data/IFI-Observations/flow_2/'+key+'.csv')
station.reset_index(drop = True,inplace=True)
station.set_index('Date',inplace=True)
station.index = pd.to_datetime(station.index)
stations.append(station)
routedat = xr.opendataset('/LDAS/02-ILDAS/OUTPUT/EXP002/SURFACEMODEL/file1.nc')
def build_data(stations,routedat,i) :
try :
start_date = stations[i]['Streamflow (cumecs)'].first_valid_index()
lis_date = routedat['time'].values
return (start_date,lis_date)
except Exception as e :
return( start_date,str(e))
for i in range (0,10):
task = dask.delayed (build_data) (stations,routedat,i)
build_delayed.append(task)
dask.compute(*build_delayed)
我得到了所需的输出,但与顺序循环相比花费的时间太大(508 毫秒对 31 秒)。
更新:我能够通过使用命令 .compute(scheduler='synchronous')
这只是一个猜测,您是否尝试创建一个 reduce 函数并对其进行计算?
类似于:
@dask.delayed
def delayed_reduce(array):
out = ''
for entry in array:
print (entry)
out = out + str(entry) + ' '
return out
build_delayed = []
for i in range (0,300):
task = dask.delayed (build_data) (params)
build_delayed.append(task)
out = delayed_reduce(build_delayed)
value = out.compute()
这只是代码的框架,但您明白了。如果您想调试正在发生的事情,请添加一个不同的 return 值并将其打印到某个文件。
另外,你没有提到你是如何 运行 昏昏欲睡的。你在使用分布式吗?如果这样做,请确保您已初始化工作程序和调度程序。
希望这个框架代码能帮助您定位问题。