multiprocessing.pool 无法在 tf.data 中加速

multiprocessing.pool cannot be accelerated in tf.data

我想在 tf.data 中使用 multiprocessing.pool 来加速我的增强功能。但是结果比正常的for循环慢。

multiprocessing.pool耗时约:72s

正常for循环成本约为:57s

我的环境:python3.6tensorflow-gpu2.4.0Ubuntu20.04

下面是我的代码,我做错了什么?

先决条件谢谢!

import numpy as np
import tensorflow as tf
from functools import partial
import multiprocessing

INPUT_SHAPE = (2000,6)
OUTPUT_SHAPE = (200,6)


def resizing(i ,data, enable, choice):
    if i==0:
        overlap=0
    else:
        overlap= 5 if enable >= 0.5 else 0
    if choice == 0:
        return [np.mean(data[i-overlap: i+10+overlap,0]),
                np.mean(data[i-overlap: i+10+overlap,1]),
                np.mean(data[i-overlap: i+10+overlap,2]),
                np.mean(data[i-overlap: i+10+overlap,3]),
                np.mean(data[i-overlap: i+10+overlap,4]),
                np.mean(data[i-overlap: i+10+overlap,5])]
    elif choice == 1:
        return [np.std(data[i-overlap: i+10+overlap,0]),
                np.std(data[i-overlap: i+10+overlap,1]),
                np.std(data[i-overlap: i+10+overlap,2]),
                np.std(data[i-overlap: i+10+overlap,3]),
                np.std(data[i-overlap: i+10+overlap,4]),
                np.std(data[i-overlap: i+10+overlap,5])]
    elif choice == 2:
        return [np.max(data[i-overlap: i+10+overlap,0]),
                np.max(data[i-overlap: i+10+overlap,1]),
                np.max(data[i-overlap: i+10+overlap,2]),
                np.max(data[i-overlap: i+10+overlap,3]),
                np.max(data[i-overlap: i+10+overlap,4]),
                np.max(data[i-overlap: i+10+overlap,5])]
    elif choice == 3:
        return [np.min(data[i-overlap: i+10+overlap,0]),
                np.min(data[i-overlap: i+10+overlap,1]),
                np.min(data[i-overlap: i+10+overlap,2]),
                np.min(data[i-overlap: i+10+overlap,3]),
                np.min(data[i-overlap: i+10+overlap,4]),
                np.min(data[i-overlap: i+10+overlap,5])]

def resize_data(data, pool_obj):

    choice = tf.random.uniform(shape=(), minval=0,maxval=4,dtype=tf.int64).numpy()
    enable = tf.random.uniform(shape=(), minval=0,maxval=1,dtype=tf.float64).numpy()
    new_data = pool_obj.map(partial(resizing, 
                                    data=data,
                                    enable=enable, 
                                    choice=choice), 
                            range(0,2000,10))
  # new_data = []
  # for i in range(0,2000,10):
  #     new_data.append(resizing(i ,data, enable, choice))
    
    return np.array(new_data)


def augmentation(data, labels, pool_obj):
    def aug(data):
        data = data.numpy()
        
        ...      

        # 2000 resize to 200
        data = resize_data(data, pool_obj)
        
        ...
        
        return tf.convert_to_tensor(data, tf.float64)

    data = tf.py_function(aug, [data], [tf.float64])[0]
    data.set_shape(OUTPUT_SHAPE)
    return data, labels

def test(trainDS):
    for d in trainDS:
        X, y = d
        print(i, X.shape, y.shape)
        

if __name__ == '__main__':
    pool_obj = multiprocessing.Pool()
    trainDS = tf.data.Dataset.from_tensor_slices(getDataSet_Path())
    trainDS = (
        trainDS
        .map(load_data, num_parallel_calls=tf.data.AUTOTUNE)
        .cache()
        .shuffle(300, reshuffle_each_iteration=False)
        .map(partial(augmentation, pool_obj=pool_obj), num_parallel_calls=tf.data.AUTOTUNE)
        .batch(128, drop_remainder=True)
        .prefetch(tf.data.AUTOTUNE)
    )
    
    test(trainDS)

TensorFlow 数据集 API 已经配备了内置多处理。只需在 mapprefetch 功能中使用 num_parallel_calls 参数,无需任何 pythonic 多处理工具。此外,仅将 TensorFlow 样式函数传递给可以转换为图形的 map。特别是,避免使用 pythonic if 块,尝试使用 tf.condtf.where 等。也不推荐使用 Numpy 例程,使用类似的 TensorFlow。按照像这样的指南 this.