有没有办法优化我的列表理解以获得更好的性能?它比 for 循环慢
Is there a way to optimize my list comprehension for better performance? It is slower than a for loop
我正在尝试优化我的代码以循环遍历 ASC 栅格文件。函数的输入是来自 ASC 文件的数据数组,形状为 1.000 x 1.000(1mio 数据点)、ASC 文件信息和列跳过值。在这种情况下,跳过值并不重要。
如果数据 == nodata_value,我的带有 for 循环代码的函数执行得不错,并跳过数组单元格。这是函数:
def asc_process_single(self, asc_array, asc_info, skip=1):
# ncols = asc_info['ncols']
nrows = asc_info['nrows']
xllcornor = asc_info['xllcornor']
yllcornor = asc_info['yllcornor']
cellsize = asc_info['cellsize']
nodata_value = asc_info['nodata_value']
raster_size_y = cellsize*nrows
# raster_size_x = cellsize*ncols
# Looping over array rows and cols with skipping
xyz = []
for row in range(asc_array.shape[0])[::skip]:
for col in range(asc_array.shape[1])[::skip]:
val_z = asc_array[row, col] # Z value of datapoint
# The no data value is not processed
if val_z == nodata_value:
pass
else:
# Xcoordinate for current Z value
val_x = xllcornor + (col * cellsize)
# Ycoordinate for current Z value
val_y = yllcornor + raster_size_y - (row * cellsize)
# x, y, z to LIST
xyz.append([val_x, val_y, val_z])
return xyz
在存在 nodata_value(s) 的 ASC 文件上重复 7 次的时间是:
593 ms ± 34.4 ms per loop (mean ± std. dev. of 10 runs, 1 loop each)
我认为我可以通过列表理解来更好地做到这一点:
def asc_process_single_listcomprehension(self, asc_array, asc_info, skip=1):
# ncols = asc_info['ncols']
nrows = asc_info['nrows']
xllcornor = asc_info['xllcornor']
yllcornor = asc_info['yllcornor']
cellsize = asc_info['cellsize']
nodata_value = asc_info['nodata_value']
raster_size_y = cellsize*nrows
# raster_size_x = cellsize*ncols
# Looping over array rows and cols with skipping
rows = range(asc_array.shape[0])[::skip]
cols = range(asc_array.shape[1])[::skip]
xyz = [[xllcornor + (col * cellsize),
yllcornor + raster_size_y - (row * cellsize),
asc_array[row, col]]
for row in rows for col in cols
if asc_array[row, col] != nodata_value]
return xyz
但是,这比我的 for 循环执行得慢,我想知道为什么?
757 ms ± 58.4 ms per loop (mean ± std. dev. of 10 runs, 1 loop each)
是不是列表推导式查找了asc_array[row, col]两次?仅此操作就花费了
193 ns ± 11.4 ns per loop (mean ± std. dev. of 7 runs, 10000000 loops each)
与仅使用我的 for 循环中数组中已查找值的 z 值进行分配
51.2 ns ± 1.18 ns per loop (mean ± std. dev. of 7 runs, 10000000 loops each)
执行此操作 1 mio 次会增加执行此操作以理解列表的时间。
有什么想法可以进一步优化我的列表理解,使其比我的 for 循环表现更好吗?还有其他提高性能的想法吗?
编辑:
解决方案:
我尝试了给出的 2 个建议。
- 在我的列表理解中引用我的 Z 值而不是这样做
在数组中查找两次需要更长的时间。
- 重写函数以处理 numpy 数组的问题
我重写的列表理解:
xyz = [[xllcornor + (col * cellsize),
yllcornor + raster_size_y - (row * cellsize),
val_z]
for row in rows for col in cols for val_z in
[asc_array[row, col]]
if val_z != nodata_value]
numpy 函数变成了这样:
def asc_process_numpy_single(self, asc_array, asc_info, skip):
# ncols = asc_info['ncols']
nrows = asc_info['nrows']
xllcornor = asc_info['xllcornor']
yllcornor = asc_info['yllcornor']
cellsize = asc_info['cellsize']
nodata_value = asc_info['nodata_value']
raster_size_y = cellsize*nrows
# raster_size_x = cellsize*ncols
rows = np.arange(0,asc_array.shape[0],skip)[:,np.newaxis]
cols = np.arange(0,asc_array.shape[1],skip)
x = np.zeros((len(rows),len(cols))) + xllcornor + (cols * cellsize)
y = np.zeros((len(rows),len(cols))) + yllcornor + raster_size_y - (rows *
cellsize)
z = asc_array[::skip,::skip]
xyz = np.asarray([x,y,z]).T.transpose((1,0,2)).reshape(
(int(len(rows)*len(cols)), 3) )
mask = (xyz[:,2] != nodata_value)
xyz = xyz[mask]
return xyz
我在 numpy 函数的最后两行添加了掩码,因为我不想要 nodata_values。
按顺序执行如下; for 循环、列表理解、列表理解建议和 numpy 函数建议:
609 ms ± 44.8 ms per loop (mean ± std. dev. of 10 runs, 1 loop each)
706 ms ± 22 ms per loop (mean ± std. dev. of 10 runs, 1 loop each)
604 ms ± 21.5 ms per loop (mean ± std. dev. of 10 runs, 1 loop each)
70.4 ms ± 1.26 ms per loop (mean ± std. dev. of 10 runs, 1 loop each)
优化后的列表理解与 for 循环相比,但 numpy 函数使聚会速度提高了 9 倍。
非常感谢您的意见和建议。今天学到了很多。
我能想到的唯一让你变慢的事情是,在原始代码中,你将 asc_array[row, col]
放入一个临时变量中,而在列表理解中,你对其求值两次。
您可能想尝试两件事:
在“if”语句中使用海象运算符将值赋给 val_z
,或者
在另外两个for
之后添加for val_z in [asc_array[row, col]]
。
祝你好运。
是的,两次评估您的数组会增加计算时间。这是我的测试用例:
def funLoop(A):
xyz = []
for row in range(A.shape[0]):
for col in range(A.shape[1]):
xyz.append([col, row, A[row, col] ])
def funListComp1(A):
xyz = [ [col, row, A[row, col] ]
for row in range(A.shape[0]) for col in range(A.shape[1])]
def funListComp2(A):
xyz = [ [col, A[row, col], A[row, col] ]
for row in range(A.shape[0]) for col in range(A.shape[1])]
A = np.random.rand(1000,1000)
%timeit funLoop(A)
%timeit funListComp1(A)
%timeit funListComp2(A)
457 ms ± 70.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
378 ms ± 8.89 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
779 ms ± 309 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
关于大数据,您应该始终更喜欢使用 numpy 而不是 python for 循环。在您的情况下,numpy 代码看起来有点像:
def asc_process_single_numpy(asc_array):
nodata_value = np.nan
raster_size_y = 1
skip = 2
xllcornor = 0
yllcornor = 0
cellsize = 1
rows = np.arange(0,asc_array.shape[0],skip)[:,np.newaxis]
cols = np.arange(0,asc_array.shape[1],skip)
#for row in rows for col in cols
x = np.zeros((len(rows),len(cols))) + xllcornor + (cols * cellsize)
y = np.zeros((len(rows),len(cols))) + yllcornor + raster_size_y - (rows * cellsize)
z = asc_array[::skip,::skip]
return np.asarray([x,y,z]).T.transpose((1,0,2)).reshape( (int(len(rows)*len(cols)), 3) )
A = np.random.rand(1000,1000)
%timeit asc_process_single(A)
%timeit asc_process_single_listcomprehension(A)
%timeit asc_process_single_numpy(A)
183 ms ± 13 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
210 ms ± 2.05 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
11.3 ms ± 222 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
我正在尝试优化我的代码以循环遍历 ASC 栅格文件。函数的输入是来自 ASC 文件的数据数组,形状为 1.000 x 1.000(1mio 数据点)、ASC 文件信息和列跳过值。在这种情况下,跳过值并不重要。
如果数据 == nodata_value,我的带有 for 循环代码的函数执行得不错,并跳过数组单元格。这是函数:
def asc_process_single(self, asc_array, asc_info, skip=1):
# ncols = asc_info['ncols']
nrows = asc_info['nrows']
xllcornor = asc_info['xllcornor']
yllcornor = asc_info['yllcornor']
cellsize = asc_info['cellsize']
nodata_value = asc_info['nodata_value']
raster_size_y = cellsize*nrows
# raster_size_x = cellsize*ncols
# Looping over array rows and cols with skipping
xyz = []
for row in range(asc_array.shape[0])[::skip]:
for col in range(asc_array.shape[1])[::skip]:
val_z = asc_array[row, col] # Z value of datapoint
# The no data value is not processed
if val_z == nodata_value:
pass
else:
# Xcoordinate for current Z value
val_x = xllcornor + (col * cellsize)
# Ycoordinate for current Z value
val_y = yllcornor + raster_size_y - (row * cellsize)
# x, y, z to LIST
xyz.append([val_x, val_y, val_z])
return xyz
在存在 nodata_value(s) 的 ASC 文件上重复 7 次的时间是:
593 ms ± 34.4 ms per loop (mean ± std. dev. of 10 runs, 1 loop each)
我认为我可以通过列表理解来更好地做到这一点:
def asc_process_single_listcomprehension(self, asc_array, asc_info, skip=1):
# ncols = asc_info['ncols']
nrows = asc_info['nrows']
xllcornor = asc_info['xllcornor']
yllcornor = asc_info['yllcornor']
cellsize = asc_info['cellsize']
nodata_value = asc_info['nodata_value']
raster_size_y = cellsize*nrows
# raster_size_x = cellsize*ncols
# Looping over array rows and cols with skipping
rows = range(asc_array.shape[0])[::skip]
cols = range(asc_array.shape[1])[::skip]
xyz = [[xllcornor + (col * cellsize),
yllcornor + raster_size_y - (row * cellsize),
asc_array[row, col]]
for row in rows for col in cols
if asc_array[row, col] != nodata_value]
return xyz
但是,这比我的 for 循环执行得慢,我想知道为什么?
757 ms ± 58.4 ms per loop (mean ± std. dev. of 10 runs, 1 loop each)
是不是列表推导式查找了asc_array[row, col]两次?仅此操作就花费了
193 ns ± 11.4 ns per loop (mean ± std. dev. of 7 runs, 10000000 loops each)
与仅使用我的 for 循环中数组中已查找值的 z 值进行分配
51.2 ns ± 1.18 ns per loop (mean ± std. dev. of 7 runs, 10000000 loops each)
执行此操作 1 mio 次会增加执行此操作以理解列表的时间。 有什么想法可以进一步优化我的列表理解,使其比我的 for 循环表现更好吗?还有其他提高性能的想法吗?
编辑: 解决方案: 我尝试了给出的 2 个建议。
- 在我的列表理解中引用我的 Z 值而不是这样做 在数组中查找两次需要更长的时间。
- 重写函数以处理 numpy 数组的问题
我重写的列表理解:
xyz = [[xllcornor + (col * cellsize),
yllcornor + raster_size_y - (row * cellsize),
val_z]
for row in rows for col in cols for val_z in
[asc_array[row, col]]
if val_z != nodata_value]
numpy 函数变成了这样:
def asc_process_numpy_single(self, asc_array, asc_info, skip):
# ncols = asc_info['ncols']
nrows = asc_info['nrows']
xllcornor = asc_info['xllcornor']
yllcornor = asc_info['yllcornor']
cellsize = asc_info['cellsize']
nodata_value = asc_info['nodata_value']
raster_size_y = cellsize*nrows
# raster_size_x = cellsize*ncols
rows = np.arange(0,asc_array.shape[0],skip)[:,np.newaxis]
cols = np.arange(0,asc_array.shape[1],skip)
x = np.zeros((len(rows),len(cols))) + xllcornor + (cols * cellsize)
y = np.zeros((len(rows),len(cols))) + yllcornor + raster_size_y - (rows *
cellsize)
z = asc_array[::skip,::skip]
xyz = np.asarray([x,y,z]).T.transpose((1,0,2)).reshape(
(int(len(rows)*len(cols)), 3) )
mask = (xyz[:,2] != nodata_value)
xyz = xyz[mask]
return xyz
我在 numpy 函数的最后两行添加了掩码,因为我不想要 nodata_values。 按顺序执行如下; for 循环、列表理解、列表理解建议和 numpy 函数建议:
609 ms ± 44.8 ms per loop (mean ± std. dev. of 10 runs, 1 loop each)
706 ms ± 22 ms per loop (mean ± std. dev. of 10 runs, 1 loop each)
604 ms ± 21.5 ms per loop (mean ± std. dev. of 10 runs, 1 loop each)
70.4 ms ± 1.26 ms per loop (mean ± std. dev. of 10 runs, 1 loop each)
优化后的列表理解与 for 循环相比,但 numpy 函数使聚会速度提高了 9 倍。
非常感谢您的意见和建议。今天学到了很多。
我能想到的唯一让你变慢的事情是,在原始代码中,你将 asc_array[row, col]
放入一个临时变量中,而在列表理解中,你对其求值两次。
您可能想尝试两件事:
在“if”语句中使用海象运算符将值赋给
val_z
,或者在另外两个
for
之后添加for val_z in [asc_array[row, col]]
。
祝你好运。
是的,两次评估您的数组会增加计算时间。这是我的测试用例:
def funLoop(A):
xyz = []
for row in range(A.shape[0]):
for col in range(A.shape[1]):
xyz.append([col, row, A[row, col] ])
def funListComp1(A):
xyz = [ [col, row, A[row, col] ]
for row in range(A.shape[0]) for col in range(A.shape[1])]
def funListComp2(A):
xyz = [ [col, A[row, col], A[row, col] ]
for row in range(A.shape[0]) for col in range(A.shape[1])]
A = np.random.rand(1000,1000)
%timeit funLoop(A)
%timeit funListComp1(A)
%timeit funListComp2(A)
457 ms ± 70.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
378 ms ± 8.89 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
779 ms ± 309 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
关于大数据,您应该始终更喜欢使用 numpy 而不是 python for 循环。在您的情况下,numpy 代码看起来有点像:
def asc_process_single_numpy(asc_array):
nodata_value = np.nan
raster_size_y = 1
skip = 2
xllcornor = 0
yllcornor = 0
cellsize = 1
rows = np.arange(0,asc_array.shape[0],skip)[:,np.newaxis]
cols = np.arange(0,asc_array.shape[1],skip)
#for row in rows for col in cols
x = np.zeros((len(rows),len(cols))) + xllcornor + (cols * cellsize)
y = np.zeros((len(rows),len(cols))) + yllcornor + raster_size_y - (rows * cellsize)
z = asc_array[::skip,::skip]
return np.asarray([x,y,z]).T.transpose((1,0,2)).reshape( (int(len(rows)*len(cols)), 3) )
A = np.random.rand(1000,1000)
%timeit asc_process_single(A)
%timeit asc_process_single_listcomprehension(A)
%timeit asc_process_single_numpy(A)
183 ms ± 13 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
210 ms ± 2.05 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
11.3 ms ± 222 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)