multiprocessing.pool 无法在 tf.data 中加速
multiprocessing.pool cannot be accelerated in tf.data
我想在 tf.data
中使用 multiprocessing.pool
来加速我的增强功能。但是结果比正常的for循环慢。
multiprocessing.pool耗时约:72s
正常for循环成本约为:57s
我的环境:python3.6
、tensorflow-gpu2.4.0
、Ubuntu20.04
下面是我的代码,我做错了什么?
先决条件谢谢!
import numpy as np
import tensorflow as tf
from functools import partial
import multiprocessing
INPUT_SHAPE = (2000,6)
OUTPUT_SHAPE = (200,6)
def resizing(i ,data, enable, choice):
if i==0:
overlap=0
else:
overlap= 5 if enable >= 0.5 else 0
if choice == 0:
return [np.mean(data[i-overlap: i+10+overlap,0]),
np.mean(data[i-overlap: i+10+overlap,1]),
np.mean(data[i-overlap: i+10+overlap,2]),
np.mean(data[i-overlap: i+10+overlap,3]),
np.mean(data[i-overlap: i+10+overlap,4]),
np.mean(data[i-overlap: i+10+overlap,5])]
elif choice == 1:
return [np.std(data[i-overlap: i+10+overlap,0]),
np.std(data[i-overlap: i+10+overlap,1]),
np.std(data[i-overlap: i+10+overlap,2]),
np.std(data[i-overlap: i+10+overlap,3]),
np.std(data[i-overlap: i+10+overlap,4]),
np.std(data[i-overlap: i+10+overlap,5])]
elif choice == 2:
return [np.max(data[i-overlap: i+10+overlap,0]),
np.max(data[i-overlap: i+10+overlap,1]),
np.max(data[i-overlap: i+10+overlap,2]),
np.max(data[i-overlap: i+10+overlap,3]),
np.max(data[i-overlap: i+10+overlap,4]),
np.max(data[i-overlap: i+10+overlap,5])]
elif choice == 3:
return [np.min(data[i-overlap: i+10+overlap,0]),
np.min(data[i-overlap: i+10+overlap,1]),
np.min(data[i-overlap: i+10+overlap,2]),
np.min(data[i-overlap: i+10+overlap,3]),
np.min(data[i-overlap: i+10+overlap,4]),
np.min(data[i-overlap: i+10+overlap,5])]
def resize_data(data, pool_obj):
choice = tf.random.uniform(shape=(), minval=0,maxval=4,dtype=tf.int64).numpy()
enable = tf.random.uniform(shape=(), minval=0,maxval=1,dtype=tf.float64).numpy()
new_data = pool_obj.map(partial(resizing,
data=data,
enable=enable,
choice=choice),
range(0,2000,10))
# new_data = []
# for i in range(0,2000,10):
# new_data.append(resizing(i ,data, enable, choice))
return np.array(new_data)
def augmentation(data, labels, pool_obj):
def aug(data):
data = data.numpy()
...
# 2000 resize to 200
data = resize_data(data, pool_obj)
...
return tf.convert_to_tensor(data, tf.float64)
data = tf.py_function(aug, [data], [tf.float64])[0]
data.set_shape(OUTPUT_SHAPE)
return data, labels
def test(trainDS):
for d in trainDS:
X, y = d
print(i, X.shape, y.shape)
if __name__ == '__main__':
pool_obj = multiprocessing.Pool()
trainDS = tf.data.Dataset.from_tensor_slices(getDataSet_Path())
trainDS = (
trainDS
.map(load_data, num_parallel_calls=tf.data.AUTOTUNE)
.cache()
.shuffle(300, reshuffle_each_iteration=False)
.map(partial(augmentation, pool_obj=pool_obj), num_parallel_calls=tf.data.AUTOTUNE)
.batch(128, drop_remainder=True)
.prefetch(tf.data.AUTOTUNE)
)
test(trainDS)
TensorFlow 数据集 API 已经配备了内置多处理。只需在 map
和 prefetch
功能中使用 num_parallel_calls
参数,无需任何 pythonic 多处理工具。此外,仅将 TensorFlow 样式函数传递给可以转换为图形的 map
。特别是,避免使用 pythonic if
块,尝试使用 tf.cond
、tf.where
等。也不推荐使用 Numpy 例程,使用类似的 TensorFlow。按照像这样的指南
this.
我想在 tf.data
中使用 multiprocessing.pool
来加速我的增强功能。但是结果比正常的for循环慢。
multiprocessing.pool耗时约:72s
正常for循环成本约为:57s
我的环境:python3.6
、tensorflow-gpu2.4.0
、Ubuntu20.04
下面是我的代码,我做错了什么?
先决条件谢谢!
import numpy as np
import tensorflow as tf
from functools import partial
import multiprocessing
INPUT_SHAPE = (2000,6)
OUTPUT_SHAPE = (200,6)
def resizing(i ,data, enable, choice):
if i==0:
overlap=0
else:
overlap= 5 if enable >= 0.5 else 0
if choice == 0:
return [np.mean(data[i-overlap: i+10+overlap,0]),
np.mean(data[i-overlap: i+10+overlap,1]),
np.mean(data[i-overlap: i+10+overlap,2]),
np.mean(data[i-overlap: i+10+overlap,3]),
np.mean(data[i-overlap: i+10+overlap,4]),
np.mean(data[i-overlap: i+10+overlap,5])]
elif choice == 1:
return [np.std(data[i-overlap: i+10+overlap,0]),
np.std(data[i-overlap: i+10+overlap,1]),
np.std(data[i-overlap: i+10+overlap,2]),
np.std(data[i-overlap: i+10+overlap,3]),
np.std(data[i-overlap: i+10+overlap,4]),
np.std(data[i-overlap: i+10+overlap,5])]
elif choice == 2:
return [np.max(data[i-overlap: i+10+overlap,0]),
np.max(data[i-overlap: i+10+overlap,1]),
np.max(data[i-overlap: i+10+overlap,2]),
np.max(data[i-overlap: i+10+overlap,3]),
np.max(data[i-overlap: i+10+overlap,4]),
np.max(data[i-overlap: i+10+overlap,5])]
elif choice == 3:
return [np.min(data[i-overlap: i+10+overlap,0]),
np.min(data[i-overlap: i+10+overlap,1]),
np.min(data[i-overlap: i+10+overlap,2]),
np.min(data[i-overlap: i+10+overlap,3]),
np.min(data[i-overlap: i+10+overlap,4]),
np.min(data[i-overlap: i+10+overlap,5])]
def resize_data(data, pool_obj):
choice = tf.random.uniform(shape=(), minval=0,maxval=4,dtype=tf.int64).numpy()
enable = tf.random.uniform(shape=(), minval=0,maxval=1,dtype=tf.float64).numpy()
new_data = pool_obj.map(partial(resizing,
data=data,
enable=enable,
choice=choice),
range(0,2000,10))
# new_data = []
# for i in range(0,2000,10):
# new_data.append(resizing(i ,data, enable, choice))
return np.array(new_data)
def augmentation(data, labels, pool_obj):
def aug(data):
data = data.numpy()
...
# 2000 resize to 200
data = resize_data(data, pool_obj)
...
return tf.convert_to_tensor(data, tf.float64)
data = tf.py_function(aug, [data], [tf.float64])[0]
data.set_shape(OUTPUT_SHAPE)
return data, labels
def test(trainDS):
for d in trainDS:
X, y = d
print(i, X.shape, y.shape)
if __name__ == '__main__':
pool_obj = multiprocessing.Pool()
trainDS = tf.data.Dataset.from_tensor_slices(getDataSet_Path())
trainDS = (
trainDS
.map(load_data, num_parallel_calls=tf.data.AUTOTUNE)
.cache()
.shuffle(300, reshuffle_each_iteration=False)
.map(partial(augmentation, pool_obj=pool_obj), num_parallel_calls=tf.data.AUTOTUNE)
.batch(128, drop_remainder=True)
.prefetch(tf.data.AUTOTUNE)
)
test(trainDS)
TensorFlow 数据集 API 已经配备了内置多处理。只需在 map
和 prefetch
功能中使用 num_parallel_calls
参数,无需任何 pythonic 多处理工具。此外,仅将 TensorFlow 样式函数传递给可以转换为图形的 map
。特别是,避免使用 pythonic if
块,尝试使用 tf.cond
、tf.where
等。也不推荐使用 Numpy 例程,使用类似的 TensorFlow。按照像这样的指南
this.