xarray:大于使用 map_blocks 将结果转储到 .zarr 存储的内存数组
xarray: Larger than memory array using map_blocks dumping results into .zarr store
我正在尝试并行化一个操作,该操作会生成一个非常大的 numpy 数组,并且通常会耗尽 运行 正在使用它的机器的内存。
我想到的是以下工作流程:
- 使用Dask生成惰性零填充数组
- 使用 X-Array 生成 DataArray,使用先前的惰性零数组及其适当的坐标等...
- 使用
DataArray.map_blocks
我调用函数 write_values
从单独的文件中获取 Numpy 数组的子集,然后将它们插入适当的位置在 xarray.DataArray
.
- 使用
DataArray
的名称延迟转换为 xarray.Dataset
- 然后我尝试通过
to_zarr
存储到磁盘
首先:这是否适合处理循环遍历分块数组中的块的操作?
其次:当我运行这个程序时,它执行时会把我的内存炸毁,这可能是由于通过Dask创建的任务量造成的?我怎样才能优化以永远不会达到我机器的内存限制。
第三: 在这段代码 运行s 之后,我将一个 zarr 存储到磁盘中,但它似乎并没有真正存储我从中获得的值外部函数。这是更改磁盘存储阵列中值的正确方法吗?
问题:我将 .zarr
写入磁盘的函数不会写入 numpy_returning_volume[=52 中的值=].我在想可能是我需要在 map_blocks
函数中写入值?
谢谢!
完整示例:
import dask.array as da
import xarray as xr
import numpy as np
import pathlib
from dask.diagnostics import ProgressBar
class NumpyReturningVolume():
def __init__(self):
# self.data = da.random.random_sample([50000, 50000, 50000])
self.data = np.random.random_sample([500, 1000, 100])
def num_i(self):
return self.data.shape[0]
def num_j(self):
return self.data.shape[1]
def num_k(self):
return self.data.shape[2]
def get_float(self, start_coordinate, end_coordinate):
return self.data[
start_coordinate[0]:end_coordinate[0],
start_coordinate[1]:end_coordinate[1],
start_coordinate[2]:end_coordinate[2]
]
def write_values(chunk, **kwargs):
start_coordinate = (chunk.coords["track"].values[0], chunk.coords["bin"].values[0], chunk.coords["time"].values[0])
end_coordinate = (chunk.coords["track"].values[-1]+1, chunk.coords["bin"].values[-1]+1, chunk.coords["time"].values[-1]+1)
volume_data = kwargs["volume"].get_float(start_coordinate, end_coordinate)
chunk.data = volume_data
return(chunk)
seismic_file_path = pathlib.Path("./")
seismic_file_name = "TEST_FILE.ds"
store_path = seismic_file_path.parent.joinpath(
seismic_file_name + "_test.zarr")
numpy_returning_volume = NumpyReturningVolume()
dimensions = ('track', 'bin', 'time')
track_coords = np.arange(0, numpy_returning_volume.num_i(), 1, dtype=np.uint32)
bin_coords = np.arange(0, numpy_returning_volume.num_j(), 1, dtype=np.uint32)
time_coords = np.arange(0, numpy_returning_volume.num_k(), 1, dtype=np.uint32)
empty_arr = da.empty(shape=(
numpy_returning_volume.num_i(),
numpy_returning_volume.num_j(),
numpy_returning_volume.num_k()),
dtype=np.float32)
xarray_data = xr.DataArray(empty_arr, name="seis", coords={
'track': track_coords,
'bin': bin_coords, 'time': time_coords},
dims=dimensions)
xarray_data.map_blocks(write_values, kwargs={
"volume": numpy_returning_volume}, template=xarray_data).compute()
xarray_data = xarray_data.to_dataset(name="seis")
delayed_results = xarray_data.to_zarr(store_path.__str__(), compute=False)
with ProgressBar():
delayed_results.compute()
天哪!我刚刚意识到我的问题是世界上最简单的事情!我只需要设置一个等于地图块结果的变量,一切正常。如果有人感兴趣,这是完整的工作脚本。它生成了一个 6GB 的数据集
import dask.array as da
import xarray as xr
import numpy as np
import pathlib
from dask.diagnostics import ProgressBar
class NumpyReturningVolume():
def __init__(self):
self.data = da.random.random_sample([1000, 2000, 1000])
# self.data = np.random.random_sample([500, 1000, 100])
def num_i(self):
return self.data.shape[0]
def num_j(self):
return self.data.shape[1]
def num_k(self):
return self.data.shape[2]
def get_float(self, start_coordinate, end_coordinate):
return self.data[
start_coordinate[0]:end_coordinate[0],
start_coordinate[1]:end_coordinate[1],
start_coordinate[2]:end_coordinate[2]
].compute()
def write_values(chunk, **kwargs):
start_coordinate = (chunk.coords["track"].values[0], chunk.coords["bin"].values[0], chunk.coords["time"].values[0])
end_coordinate = (chunk.coords["track"].values[-1]+1, chunk.coords["bin"].values[-1]+1, chunk.coords["time"].values[-1]+1)
volume_data = kwargs["volume"].get_float(start_coordinate, end_coordinate)
chunk.data = volume_data
return(chunk)
seismic_file_path = pathlib.Path("./")
seismic_file_name = "TEST_FILE.ds"
store_path = seismic_file_path.parent.joinpath(
seismic_file_name + "_test.zarr")
numpy_returning_volume = NumpyReturningVolume()
dimensions = ('track', 'bin', 'time')
track_coords = np.arange(0, numpy_returning_volume.num_i(), 1, dtype=np.uint32)
bin_coords = np.arange(0, numpy_returning_volume.num_j(), 1, dtype=np.uint32)
time_coords = np.arange(0, numpy_returning_volume.num_k(), 1, dtype=np.uint32)
empty_arr = da.empty(shape=(
numpy_returning_volume.num_i(),
numpy_returning_volume.num_j(),
numpy_returning_volume.num_k()),
dtype=np.float32)
xarray_data = xr.DataArray(empty_arr, name="seis", coords={
'track': track_coords,
'bin': bin_coords, 'time': time_coords},
dims=dimensions)
# This xarray_data = is what I was missing!!
xarray_data = xarray_data.map_blocks(write_values, kwargs={
"volume": numpy_returning_volume}, template=xarray_data)
xarray_data = xarray_data.to_dataset(name="seis")
delayed_results = xarray_data.to_zarr(store_path.__str__(), compute=False)
with ProgressBar():
delayed_results.compute()
我正在尝试并行化一个操作,该操作会生成一个非常大的 numpy 数组,并且通常会耗尽 运行 正在使用它的机器的内存。
我想到的是以下工作流程:
- 使用Dask生成惰性零填充数组
- 使用 X-Array 生成 DataArray,使用先前的惰性零数组及其适当的坐标等...
- 使用
DataArray.map_blocks
我调用函数write_values
从单独的文件中获取 Numpy 数组的子集,然后将它们插入适当的位置在xarray.DataArray
. - 使用
DataArray
的名称延迟转换为 - 然后我尝试通过
to_zarr
存储到磁盘
xarray.Dataset
首先:这是否适合处理循环遍历分块数组中的块的操作?
其次:当我运行这个程序时,它执行时会把我的内存炸毁,这可能是由于通过Dask创建的任务量造成的?我怎样才能优化以永远不会达到我机器的内存限制。
第三: 在这段代码 运行s 之后,我将一个 zarr 存储到磁盘中,但它似乎并没有真正存储我从中获得的值外部函数。这是更改磁盘存储阵列中值的正确方法吗?
问题:我将 .zarr
写入磁盘的函数不会写入 numpy_returning_volume[=52 中的值=].我在想可能是我需要在 map_blocks
函数中写入值?
谢谢!
完整示例:
import dask.array as da
import xarray as xr
import numpy as np
import pathlib
from dask.diagnostics import ProgressBar
class NumpyReturningVolume():
def __init__(self):
# self.data = da.random.random_sample([50000, 50000, 50000])
self.data = np.random.random_sample([500, 1000, 100])
def num_i(self):
return self.data.shape[0]
def num_j(self):
return self.data.shape[1]
def num_k(self):
return self.data.shape[2]
def get_float(self, start_coordinate, end_coordinate):
return self.data[
start_coordinate[0]:end_coordinate[0],
start_coordinate[1]:end_coordinate[1],
start_coordinate[2]:end_coordinate[2]
]
def write_values(chunk, **kwargs):
start_coordinate = (chunk.coords["track"].values[0], chunk.coords["bin"].values[0], chunk.coords["time"].values[0])
end_coordinate = (chunk.coords["track"].values[-1]+1, chunk.coords["bin"].values[-1]+1, chunk.coords["time"].values[-1]+1)
volume_data = kwargs["volume"].get_float(start_coordinate, end_coordinate)
chunk.data = volume_data
return(chunk)
seismic_file_path = pathlib.Path("./")
seismic_file_name = "TEST_FILE.ds"
store_path = seismic_file_path.parent.joinpath(
seismic_file_name + "_test.zarr")
numpy_returning_volume = NumpyReturningVolume()
dimensions = ('track', 'bin', 'time')
track_coords = np.arange(0, numpy_returning_volume.num_i(), 1, dtype=np.uint32)
bin_coords = np.arange(0, numpy_returning_volume.num_j(), 1, dtype=np.uint32)
time_coords = np.arange(0, numpy_returning_volume.num_k(), 1, dtype=np.uint32)
empty_arr = da.empty(shape=(
numpy_returning_volume.num_i(),
numpy_returning_volume.num_j(),
numpy_returning_volume.num_k()),
dtype=np.float32)
xarray_data = xr.DataArray(empty_arr, name="seis", coords={
'track': track_coords,
'bin': bin_coords, 'time': time_coords},
dims=dimensions)
xarray_data.map_blocks(write_values, kwargs={
"volume": numpy_returning_volume}, template=xarray_data).compute()
xarray_data = xarray_data.to_dataset(name="seis")
delayed_results = xarray_data.to_zarr(store_path.__str__(), compute=False)
with ProgressBar():
delayed_results.compute()
天哪!我刚刚意识到我的问题是世界上最简单的事情!我只需要设置一个等于地图块结果的变量,一切正常。如果有人感兴趣,这是完整的工作脚本。它生成了一个 6GB 的数据集
import dask.array as da
import xarray as xr
import numpy as np
import pathlib
from dask.diagnostics import ProgressBar
class NumpyReturningVolume():
def __init__(self):
self.data = da.random.random_sample([1000, 2000, 1000])
# self.data = np.random.random_sample([500, 1000, 100])
def num_i(self):
return self.data.shape[0]
def num_j(self):
return self.data.shape[1]
def num_k(self):
return self.data.shape[2]
def get_float(self, start_coordinate, end_coordinate):
return self.data[
start_coordinate[0]:end_coordinate[0],
start_coordinate[1]:end_coordinate[1],
start_coordinate[2]:end_coordinate[2]
].compute()
def write_values(chunk, **kwargs):
start_coordinate = (chunk.coords["track"].values[0], chunk.coords["bin"].values[0], chunk.coords["time"].values[0])
end_coordinate = (chunk.coords["track"].values[-1]+1, chunk.coords["bin"].values[-1]+1, chunk.coords["time"].values[-1]+1)
volume_data = kwargs["volume"].get_float(start_coordinate, end_coordinate)
chunk.data = volume_data
return(chunk)
seismic_file_path = pathlib.Path("./")
seismic_file_name = "TEST_FILE.ds"
store_path = seismic_file_path.parent.joinpath(
seismic_file_name + "_test.zarr")
numpy_returning_volume = NumpyReturningVolume()
dimensions = ('track', 'bin', 'time')
track_coords = np.arange(0, numpy_returning_volume.num_i(), 1, dtype=np.uint32)
bin_coords = np.arange(0, numpy_returning_volume.num_j(), 1, dtype=np.uint32)
time_coords = np.arange(0, numpy_returning_volume.num_k(), 1, dtype=np.uint32)
empty_arr = da.empty(shape=(
numpy_returning_volume.num_i(),
numpy_returning_volume.num_j(),
numpy_returning_volume.num_k()),
dtype=np.float32)
xarray_data = xr.DataArray(empty_arr, name="seis", coords={
'track': track_coords,
'bin': bin_coords, 'time': time_coords},
dims=dimensions)
# This xarray_data = is what I was missing!!
xarray_data = xarray_data.map_blocks(write_values, kwargs={
"volume": numpy_returning_volume}, template=xarray_data)
xarray_data = xarray_data.to_dataset(name="seis")
delayed_results = xarray_data.to_zarr(store_path.__str__(), compute=False)
with ProgressBar():
delayed_results.compute()