缺失的例子 |使用线程预取和预处理数据
the missing example | pre-fetch and pre-process data using threads
似乎有很多关于 TensorFlow 的使用的未解决问题,一些 tensorflow 的开发人员在 Whosebug 上很活跃。这是另一个问题。我想使用 numpy 或不属于 TensorFlow 的东西在其他线程中即时生成训练数据。但是,我不想一次又一次地重新编译整个 TensorFlow 源代码。我只是在等待另一种方式。 "tf.py_func" 似乎是一种解决方法。但是
这与 [how-to-prefetch-data-using-a-custom-python-function-in-tensorflow][1]
有关
这是我的 MnWE(最小不工作示例):
更新(现在有输出但也有竞争条件):
import numpy as np
import tensorflow as tf
import threading
import os
import glob
import random
import matplotlib.pyplot as plt
IMAGE_ROOT = "/graphics/projects/data/mscoco2014/data/images/"
files = ["train/COCO_train2014_000000178763.jpg",
"train/COCO_train2014_000000543841.jpg",
"train/COCO_train2014_000000364433.jpg",
"train/COCO_train2014_000000091123.jpg",
"train/COCO_train2014_000000498916.jpg",
"train/COCO_train2014_000000429865.jpg",
"train/COCO_train2014_000000400199.jpg",
"train/COCO_train2014_000000230367.jpg",
"train/COCO_train2014_000000281214.jpg",
"train/COCO_train2014_000000041920.jpg"];
# --------------------------------------------------------------------------------
def pre_process(data):
"""Pre-process image with arbitrary functions
does not only use tf.functions, but arbitrary
"""
# here is the place to do some fancy stuff
# which might be out of the scope of tf
return data[0:81,0,0].flatten()
def populate_queue(sess, thread_pool, qData_enqueue_op ):
"""Put stuff into the data queue
is responsible such that there is alwaays data to process
for tensorflow
"""
# until somebody tell me I can stop ...
while not thread_pool.should_stop():
# get a random image from MS COCO
idx = random.randint(0,len(files))-1
data = np.array(plt.imread(os.path.join(IMAGE_ROOT,files[idx])))
data = pre_process(data)
# put into the queue
sess.run(qData_enqueue_op, feed_dict={data_input: data})
# a simple queue for gather data (just to keep it currently simple)
qData = tf.FIFOQueue(100, [tf.float32], shapes=[[9,9]])
data_input = tf.placeholder(tf.float32)
qData_enqueue_op = qData.enqueue([tf.reshape(data_input,[9,9])])
qData_dequeue_op = qData.dequeue()
init_op = tf.initialize_all_variables()
with tf.Session() as sess:
# init all variables
sess.run(init_op)
# coordinate of pool of threads
thread_pool = tf.train.Coordinator()
# start fill in data
t = threading.Thread(target=populate_queue, args=(sess, thread_pool, qData_enqueue_op))
t.start()
# Can I use "tf.train.start_queue_runners" here
# How to use multiple threads?
try:
while not thread_pool.should_stop():
print "iter"
# HERE THE SILENCE BEGIN !!!!!!!!!!!
batch = sess.run([qData_dequeue_op])
print batch
except tf.errors.OutOfRangeError:
print('Done training -- no more data')
finally:
# When done, ask the threads to stop.
thread_pool.request_stop()
# now they should definetely stop
thread_pool.request_stop()
thread_pool.join([t])
我基本上有三个问题:
- 这段代码有什么问题?它遇到了无尽的损失(这是不可调试的)。参见第 "HERE THE SILENCE BEGIN ..."
行
- 如何扩展此代码以使用更多线程?
- 是否值得转换为 tf.Record 大型数据集或可以即时生成的数据?
你这行有误:
t = threading.Thread(target=populate_queue, args=(sess, thread_pool, qData))
应该是qData_enqueue_op
而不是qData
。否则,您的入队操作会失败,并且您会在尝试从大小为 0 的队列中出队时卡住。我在尝试 运行 您的代码并获得
时看到了这一点
TypeError: Fetch argument <google3.third_party.tensorflow.python.ops.data_flow_ops.FIFOQueue object at 0x4bc1f10> of <google3.third_party.tensorflow.python.ops.data_flow_ops.FIFOQueue object at 0x4bc1f10> has invalid type <class 'google3.third_party.tensorflow.python.ops.data_flow_ops.FIFOQueue'>, must be a string or Tensor. (Can not convert a FIFOQueue into a Tensor or Operation.)
关于其他问题:
- 在此示例中,您不需要启动队列 运行ners,因为您没有。队列 运行ners 是由像 string_input_producer 这样的输入生产者创建的,它本质上是 FIFO 队列 + 启动线程的逻辑。您正在通过启动执行入队操作的自己的线程来复制 50% 的队列 运行ner 功能。 (另外50%正在关闭队列)
- RE: converting to tf.record -- Python 有一个叫做 Global Interpreter Lock 的东西,这意味着两位 Python 代码不能同时执行。在实践中,由于大量时间花费在 numpy C++ 代码或 IO 操作(释放 GIL)这一事实,这一点得到了缓解。所以我认为这是检查您是否能够使用 Python 预处理管道实现所需的并行性的问题。
似乎有很多关于 TensorFlow 的使用的未解决问题,一些 tensorflow 的开发人员在 Whosebug 上很活跃。这是另一个问题。我想使用 numpy 或不属于 TensorFlow 的东西在其他线程中即时生成训练数据。但是,我不想一次又一次地重新编译整个 TensorFlow 源代码。我只是在等待另一种方式。 "tf.py_func" 似乎是一种解决方法。但是
这与 [how-to-prefetch-data-using-a-custom-python-function-in-tensorflow][1]
有关这是我的 MnWE(最小不工作示例):
更新(现在有输出但也有竞争条件):
import numpy as np
import tensorflow as tf
import threading
import os
import glob
import random
import matplotlib.pyplot as plt
IMAGE_ROOT = "/graphics/projects/data/mscoco2014/data/images/"
files = ["train/COCO_train2014_000000178763.jpg",
"train/COCO_train2014_000000543841.jpg",
"train/COCO_train2014_000000364433.jpg",
"train/COCO_train2014_000000091123.jpg",
"train/COCO_train2014_000000498916.jpg",
"train/COCO_train2014_000000429865.jpg",
"train/COCO_train2014_000000400199.jpg",
"train/COCO_train2014_000000230367.jpg",
"train/COCO_train2014_000000281214.jpg",
"train/COCO_train2014_000000041920.jpg"];
# --------------------------------------------------------------------------------
def pre_process(data):
"""Pre-process image with arbitrary functions
does not only use tf.functions, but arbitrary
"""
# here is the place to do some fancy stuff
# which might be out of the scope of tf
return data[0:81,0,0].flatten()
def populate_queue(sess, thread_pool, qData_enqueue_op ):
"""Put stuff into the data queue
is responsible such that there is alwaays data to process
for tensorflow
"""
# until somebody tell me I can stop ...
while not thread_pool.should_stop():
# get a random image from MS COCO
idx = random.randint(0,len(files))-1
data = np.array(plt.imread(os.path.join(IMAGE_ROOT,files[idx])))
data = pre_process(data)
# put into the queue
sess.run(qData_enqueue_op, feed_dict={data_input: data})
# a simple queue for gather data (just to keep it currently simple)
qData = tf.FIFOQueue(100, [tf.float32], shapes=[[9,9]])
data_input = tf.placeholder(tf.float32)
qData_enqueue_op = qData.enqueue([tf.reshape(data_input,[9,9])])
qData_dequeue_op = qData.dequeue()
init_op = tf.initialize_all_variables()
with tf.Session() as sess:
# init all variables
sess.run(init_op)
# coordinate of pool of threads
thread_pool = tf.train.Coordinator()
# start fill in data
t = threading.Thread(target=populate_queue, args=(sess, thread_pool, qData_enqueue_op))
t.start()
# Can I use "tf.train.start_queue_runners" here
# How to use multiple threads?
try:
while not thread_pool.should_stop():
print "iter"
# HERE THE SILENCE BEGIN !!!!!!!!!!!
batch = sess.run([qData_dequeue_op])
print batch
except tf.errors.OutOfRangeError:
print('Done training -- no more data')
finally:
# When done, ask the threads to stop.
thread_pool.request_stop()
# now they should definetely stop
thread_pool.request_stop()
thread_pool.join([t])
我基本上有三个问题:
- 这段代码有什么问题?它遇到了无尽的损失(这是不可调试的)。参见第 "HERE THE SILENCE BEGIN ..." 行
- 如何扩展此代码以使用更多线程?
- 是否值得转换为 tf.Record 大型数据集或可以即时生成的数据?
你这行有误:
t = threading.Thread(target=populate_queue, args=(sess, thread_pool, qData))
应该是qData_enqueue_op
而不是qData
。否则,您的入队操作会失败,并且您会在尝试从大小为 0 的队列中出队时卡住。我在尝试 运行 您的代码并获得
TypeError: Fetch argument <google3.third_party.tensorflow.python.ops.data_flow_ops.FIFOQueue object at 0x4bc1f10> of <google3.third_party.tensorflow.python.ops.data_flow_ops.FIFOQueue object at 0x4bc1f10> has invalid type <class 'google3.third_party.tensorflow.python.ops.data_flow_ops.FIFOQueue'>, must be a string or Tensor. (Can not convert a FIFOQueue into a Tensor or Operation.)
关于其他问题:
- 在此示例中,您不需要启动队列 运行ners,因为您没有。队列 运行ners 是由像 string_input_producer 这样的输入生产者创建的,它本质上是 FIFO 队列 + 启动线程的逻辑。您正在通过启动执行入队操作的自己的线程来复制 50% 的队列 运行ner 功能。 (另外50%正在关闭队列)
- RE: converting to tf.record -- Python 有一个叫做 Global Interpreter Lock 的东西,这意味着两位 Python 代码不能同时执行。在实践中,由于大量时间花费在 numpy C++ 代码或 IO 操作(释放 GIL)这一事实,这一点得到了缓解。所以我认为这是检查您是否能够使用 Python 预处理管道实现所需的并行性的问题。