如何在 tensorflow 中使用自定义 python 函数预取数据
How to prefetch data using a custom python function in tensorflow
我正在尝试预取训练数据以隐藏 I/O 延迟。我想编写自定义 Python 代码,从磁盘加载数据并预处理数据(例如,通过添加上下文 window)。也就是说,一个线程做数据预处理,另一个做训练。这在 TensorFlow 中可行吗?
更新:我有一个基于@mrry 示例的工作示例。
import numpy as np
import tensorflow as tf
import threading
BATCH_SIZE = 5
TRAINING_ITERS = 4100
feature_input = tf.placeholder(tf.float32, shape=[128])
label_input = tf.placeholder(tf.float32, shape=[128])
q = tf.FIFOQueue(200, [tf.float32, tf.float32], shapes=[[128], [128]])
enqueue_op = q.enqueue([label_input, feature_input])
label_batch, feature_batch = q.dequeue_many(BATCH_SIZE)
c = tf.reshape(feature_batch, [BATCH_SIZE, 128]) + tf.reshape(label_batch, [BATCH_SIZE, 128])
sess = tf.Session()
def load_and_enqueue(sess, enqueue_op, coord):
with open('dummy_data/features.bin') as feature_file, open('dummy_data/labels.bin') as label_file:
while not coord.should_stop():
feature_array = np.fromfile(feature_file, np.float32, 128)
if feature_array.shape[0] == 0:
print('reach end of file, reset using seek(0,0)')
feature_file.seek(0,0)
label_file.seek(0,0)
continue
label_value = np.fromfile(label_file, np.float32, 128)
sess.run(enqueue_op, feed_dict={feature_input: feature_array,
label_input: label_value})
coord = tf.train.Coordinator()
t = threading.Thread(target=load_and_enqueue, args=(sess,enqueue_op, coord))
t.start()
for i in range(TRAINING_ITERS):
sum = sess.run(c)
print('train_iter='+str(i))
print(sum)
coord.request_stop()
coord.join([t])
这是一个常见的用例,大多数实现使用 TensorFlow 的 queues 将预处理代码与训练代码分离。有a tutorial on how to use queues,但主要步骤如下:
定义一个队列,q
,它将缓冲预处理数据。 TensorFlow 支持以随机顺序生成元素的简单 tf.FIFOQueue
that produces elements in the order they were enqueued, and the more advanced tf.RandomShuffleQueue
。队列元素是一个或多个张量(可以具有不同类型和形状)的元组。所有队列都支持 single-element (enqueue
, dequeue
) 和批处理 (enqueue_many
, dequeue_many
) 操作,但要使用批处理操作,您必须指定形状构造队列时队列元素中的每个张量。
构建一个子图,将预处理的元素排入队列。一种方法是改为定义一些 tf.placeholder()
ops for tensors corresponding to a single input example, then pass them to q.enqueue()
. (If your preprocessing produces a batch at once, you should use q.enqueue_many()
。)您还可以在此子图中包含 TensorFlow 操作。
构建一个执行训练的子图。这看起来像一个常规的 TensorFlow 图,但将通过调用 q.dequeue_many(BATCH_SIZE)
.
获取其输入
开始你的会话。
创建一个或多个执行预处理逻辑的线程,然后执行入队操作,输入预处理后的数据。您可能会发现 tf.train.Coordinator
and tf.train.QueueRunner
实用程序 类 对此很有用。
运行 你的训练图(优化器等)正常。
编辑: 这是一个简单的 load_and_enqueue()
函数和代码片段,可帮助您入门:
# Features are length-100 vectors of floats
feature_input = tf.placeholder(tf.float32, shape=[100])
# Labels are scalar integers.
label_input = tf.placeholder(tf.int32, shape=[])
# Alternatively, could do:
# feature_batch_input = tf.placeholder(tf.float32, shape=[None, 100])
# label_batch_input = tf.placeholder(tf.int32, shape=[None])
q = tf.FIFOQueue(100, [tf.float32, tf.int32], shapes=[[100], []])
enqueue_op = q.enqueue([feature_input, label_input])
# For batch input, do:
# enqueue_op = q.enqueue_many([feature_batch_input, label_batch_input])
feature_batch, label_batch = q.dequeue_many(BATCH_SIZE)
# Build rest of model taking label_batch, feature_batch as input.
# [...]
train_op = ...
sess = tf.Session()
def load_and_enqueue():
with open(...) as feature_file, open(...) as label_file:
while True:
feature_array = numpy.fromfile(feature_file, numpy.float32, 100)
if not feature_array:
return
label_value = numpy.fromfile(feature_file, numpy.int32, 1)[0]
sess.run(enqueue_op, feed_dict={feature_input: feature_array,
label_input: label_value})
# Start a thread to enqueue data asynchronously, and hide I/O latency.
t = threading.Thread(target=load_and_enqueue)
t.start()
for _ in range(TRAINING_EPOCHS):
sess.run(train_op)
In other words, one thread does data preprocessing and the other does training. Is this possible in TensorFlow?
是的,是的。 mrry 的解决方案有效,但存在更简单的方法。
正在获取数据
tf.py_func
包装了一个 python 函数并将其用作 TensorFlow 运算符。所以我们可以每次加载sess.run()
处的数据。这种方法的问题是数据是在 sess.run()
期间通过主线程加载的。
一个最小的例子:
def get_numpy_tensor():
return np.array([[1,2],[3,4]], dtype=np.float32)
tensorflow_tensor = tf.py_func(get_numpy_tensor, [], tf.float32)
一个更复杂的例子:
def get_numpy_tensors():
# Load data from the disk into numpy arrays.
input = np.array([[1,2],[3,4]], dtype=np.float32)
target = np.int32(1)
return input, target
tensorflow_input, tensorflow_target = tf.py_func(get_numpy_tensors, [], [tf.float32, tf.int32])
tensorflow_input, tensorflow_target = 2*tensorflow_input, 2*tensorflow_target
sess = tf.InteractiveSession()
numpy_input, numpy_target = sess.run([tensorflow_input, tensorflow_target])
assert np.all(numpy_input==np.array([[2,4],[6,8]])) and numpy_target==2
正在另一个线程中预取数据
要在另一个线程中排队我们的数据(这样 sess.run()
就不必等待数据),我们可以对来自 tf.py_func()
的运算符使用 tf.train.batch()
。
一个最小的例子:
tensor_shape = get_numpy_tensor().shape
tensorflow_tensors = tf.train.batch([tensorflow_tensor], batch_size=32, shapes=[tensor_shape])
# Run `tf.train.start_queue_runners()` once session is created.
如果 tensorflow_tensor
指定了它的形状,我们可以省略参数 shapes
:
tensor_shape = get_numpy_tensor().shape
tensorflow_tensor.set_shape(tensor_shape)
tensorflow_tensors = tf.train.batch([tensorflow_tensor], batch_size=32)
# Run `tf.train.start_queue_runners()` once session is created.
一个更复杂的例子:
input_shape, target_shape = (2, 2), ()
def get_numpy_tensors():
input = np.random.rand(*input_shape).astype(np.float32)
target = np.random.randint(10, dtype=np.int32)
print('f', end='')
return input, target
tensorflow_input, tensorflow_target = tf.py_func(get_numpy_tensors, [], [tf.float32, tf.int32])
batch_size = 2
tensorflow_inputs, tensorflow_targets = tf.train.batch([tensorflow_input, tensorflow_target], batch_size, shapes=[input_shape, target_shape], capacity=2)
# Internal queue will contain at most `capasity=2` times `batch_size=2` elements `[tensorflow_input, tensorflow_target]`.
tensorflow_inputs, tensorflow_targets = 2*tensorflow_inputs, 2*tensorflow_targets
sess = tf.InteractiveSession()
tf.train.start_queue_runners() # Internally, `tf.train.batch` uses a QueueRunner, so we need to ask tf to start it.
for _ in range(10):
numpy_inputs, numpy_targets = sess.run([tensorflow_inputs, tensorflow_targets])
assert numpy_inputs.shape==(batch_size, *input_shape) and numpy_targets.shape==(batch_size, *target_shape)
print('r', end='')
# Prints `fffffrrffrfrffrffrffrffrffrffrf`.
万一get_numpy_tensor()
returns一批张量,那么tf.train.batch(..., enqueue_many=True)
会有帮助。
我正在尝试预取训练数据以隐藏 I/O 延迟。我想编写自定义 Python 代码,从磁盘加载数据并预处理数据(例如,通过添加上下文 window)。也就是说,一个线程做数据预处理,另一个做训练。这在 TensorFlow 中可行吗?
更新:我有一个基于@mrry 示例的工作示例。
import numpy as np
import tensorflow as tf
import threading
BATCH_SIZE = 5
TRAINING_ITERS = 4100
feature_input = tf.placeholder(tf.float32, shape=[128])
label_input = tf.placeholder(tf.float32, shape=[128])
q = tf.FIFOQueue(200, [tf.float32, tf.float32], shapes=[[128], [128]])
enqueue_op = q.enqueue([label_input, feature_input])
label_batch, feature_batch = q.dequeue_many(BATCH_SIZE)
c = tf.reshape(feature_batch, [BATCH_SIZE, 128]) + tf.reshape(label_batch, [BATCH_SIZE, 128])
sess = tf.Session()
def load_and_enqueue(sess, enqueue_op, coord):
with open('dummy_data/features.bin') as feature_file, open('dummy_data/labels.bin') as label_file:
while not coord.should_stop():
feature_array = np.fromfile(feature_file, np.float32, 128)
if feature_array.shape[0] == 0:
print('reach end of file, reset using seek(0,0)')
feature_file.seek(0,0)
label_file.seek(0,0)
continue
label_value = np.fromfile(label_file, np.float32, 128)
sess.run(enqueue_op, feed_dict={feature_input: feature_array,
label_input: label_value})
coord = tf.train.Coordinator()
t = threading.Thread(target=load_and_enqueue, args=(sess,enqueue_op, coord))
t.start()
for i in range(TRAINING_ITERS):
sum = sess.run(c)
print('train_iter='+str(i))
print(sum)
coord.request_stop()
coord.join([t])
这是一个常见的用例,大多数实现使用 TensorFlow 的 queues 将预处理代码与训练代码分离。有a tutorial on how to use queues,但主要步骤如下:
定义一个队列,
q
,它将缓冲预处理数据。 TensorFlow 支持以随机顺序生成元素的简单tf.FIFOQueue
that produces elements in the order they were enqueued, and the more advancedtf.RandomShuffleQueue
。队列元素是一个或多个张量(可以具有不同类型和形状)的元组。所有队列都支持 single-element (enqueue
,dequeue
) 和批处理 (enqueue_many
,dequeue_many
) 操作,但要使用批处理操作,您必须指定形状构造队列时队列元素中的每个张量。构建一个子图,将预处理的元素排入队列。一种方法是改为定义一些
tf.placeholder()
ops for tensors corresponding to a single input example, then pass them toq.enqueue()
. (If your preprocessing produces a batch at once, you should useq.enqueue_many()
。)您还可以在此子图中包含 TensorFlow 操作。构建一个执行训练的子图。这看起来像一个常规的 TensorFlow 图,但将通过调用
q.dequeue_many(BATCH_SIZE)
. 获取其输入
开始你的会话。
创建一个或多个执行预处理逻辑的线程,然后执行入队操作,输入预处理后的数据。您可能会发现
tf.train.Coordinator
andtf.train.QueueRunner
实用程序 类 对此很有用。运行 你的训练图(优化器等)正常。
编辑: 这是一个简单的 load_and_enqueue()
函数和代码片段,可帮助您入门:
# Features are length-100 vectors of floats
feature_input = tf.placeholder(tf.float32, shape=[100])
# Labels are scalar integers.
label_input = tf.placeholder(tf.int32, shape=[])
# Alternatively, could do:
# feature_batch_input = tf.placeholder(tf.float32, shape=[None, 100])
# label_batch_input = tf.placeholder(tf.int32, shape=[None])
q = tf.FIFOQueue(100, [tf.float32, tf.int32], shapes=[[100], []])
enqueue_op = q.enqueue([feature_input, label_input])
# For batch input, do:
# enqueue_op = q.enqueue_many([feature_batch_input, label_batch_input])
feature_batch, label_batch = q.dequeue_many(BATCH_SIZE)
# Build rest of model taking label_batch, feature_batch as input.
# [...]
train_op = ...
sess = tf.Session()
def load_and_enqueue():
with open(...) as feature_file, open(...) as label_file:
while True:
feature_array = numpy.fromfile(feature_file, numpy.float32, 100)
if not feature_array:
return
label_value = numpy.fromfile(feature_file, numpy.int32, 1)[0]
sess.run(enqueue_op, feed_dict={feature_input: feature_array,
label_input: label_value})
# Start a thread to enqueue data asynchronously, and hide I/O latency.
t = threading.Thread(target=load_and_enqueue)
t.start()
for _ in range(TRAINING_EPOCHS):
sess.run(train_op)
In other words, one thread does data preprocessing and the other does training. Is this possible in TensorFlow?
是的,是的。 mrry 的解决方案有效,但存在更简单的方法。
正在获取数据
tf.py_func
包装了一个 python 函数并将其用作 TensorFlow 运算符。所以我们可以每次加载sess.run()
处的数据。这种方法的问题是数据是在 sess.run()
期间通过主线程加载的。
一个最小的例子:
def get_numpy_tensor():
return np.array([[1,2],[3,4]], dtype=np.float32)
tensorflow_tensor = tf.py_func(get_numpy_tensor, [], tf.float32)
一个更复杂的例子:
def get_numpy_tensors():
# Load data from the disk into numpy arrays.
input = np.array([[1,2],[3,4]], dtype=np.float32)
target = np.int32(1)
return input, target
tensorflow_input, tensorflow_target = tf.py_func(get_numpy_tensors, [], [tf.float32, tf.int32])
tensorflow_input, tensorflow_target = 2*tensorflow_input, 2*tensorflow_target
sess = tf.InteractiveSession()
numpy_input, numpy_target = sess.run([tensorflow_input, tensorflow_target])
assert np.all(numpy_input==np.array([[2,4],[6,8]])) and numpy_target==2
正在另一个线程中预取数据
要在另一个线程中排队我们的数据(这样 sess.run()
就不必等待数据),我们可以对来自 tf.py_func()
的运算符使用 tf.train.batch()
。
一个最小的例子:
tensor_shape = get_numpy_tensor().shape
tensorflow_tensors = tf.train.batch([tensorflow_tensor], batch_size=32, shapes=[tensor_shape])
# Run `tf.train.start_queue_runners()` once session is created.
如果 tensorflow_tensor
指定了它的形状,我们可以省略参数 shapes
:
tensor_shape = get_numpy_tensor().shape
tensorflow_tensor.set_shape(tensor_shape)
tensorflow_tensors = tf.train.batch([tensorflow_tensor], batch_size=32)
# Run `tf.train.start_queue_runners()` once session is created.
一个更复杂的例子:
input_shape, target_shape = (2, 2), ()
def get_numpy_tensors():
input = np.random.rand(*input_shape).astype(np.float32)
target = np.random.randint(10, dtype=np.int32)
print('f', end='')
return input, target
tensorflow_input, tensorflow_target = tf.py_func(get_numpy_tensors, [], [tf.float32, tf.int32])
batch_size = 2
tensorflow_inputs, tensorflow_targets = tf.train.batch([tensorflow_input, tensorflow_target], batch_size, shapes=[input_shape, target_shape], capacity=2)
# Internal queue will contain at most `capasity=2` times `batch_size=2` elements `[tensorflow_input, tensorflow_target]`.
tensorflow_inputs, tensorflow_targets = 2*tensorflow_inputs, 2*tensorflow_targets
sess = tf.InteractiveSession()
tf.train.start_queue_runners() # Internally, `tf.train.batch` uses a QueueRunner, so we need to ask tf to start it.
for _ in range(10):
numpy_inputs, numpy_targets = sess.run([tensorflow_inputs, tensorflow_targets])
assert numpy_inputs.shape==(batch_size, *input_shape) and numpy_targets.shape==(batch_size, *target_shape)
print('r', end='')
# Prints `fffffrrffrfrffrffrffrffrffrffrf`.
万一get_numpy_tensor()
returns一批张量,那么tf.train.batch(..., enqueue_many=True)
会有帮助。