TensorFlow:从多个线程入队和出队
TensorFlow : Enqueuing and dequeuing a queue from multiple threads
我试图解决的问题如下:
我有一个文件名列表 trainimgs
。我定义了一个
tf.RandomShuffleQueue
及其 capacity=len(trainimgs)
和 min_after_dequeue=0
。
- 此
tf.RandomShuffleQueue
预计将由 trainimgs
填充指定的 epochlimit
次。
- 许多线程应该并行工作。每个线程从
tf.RandomShuffleQueue
中取出一个元素并对其执行一些操作并将其排入另一个队列。我做对了那部分。
- 然而一旦
trainimgs
的 1 epoch
被处理并且 tf.RandomShuffleQueue
为空,前提是当前纪元 e < epochlimit
,队列必须再次被填满并且线程必须再次工作。
好消息是:我已经在特定情况下使用它了(请参阅最后的 PS!!)
坏消息是:我认为有更好的方法。
我现在使用的方法如下(我简化了功能并删除了基于图像处理的预处理和后续入队,但处理的核心保持不变!!):
with tf.Session() as sess:
train_filename_queue = tf.RandomShuffleQueue(capacity=len(trainimgs), min_after_dequeue=0, dtypes=tf.string, seed=0)
queue_size = train_filename_queue.size()
trainimgtensor = tf.constant(trainimgs)
close_queue = train_filename_queue.close()
epoch = tf.Variable(initial_value=1, trainable=False, dtype=tf.int32)
incrementepoch = tf.assign(epoch, epoch + 1, use_locking=True)
supplyimages = train_filename_queue.enqueue_many(trainimgtensor)
value = train_filename_queue.dequeue()
init_op = tf.group(tf.global_variables_initializer(), tf.local_variables_initializer())
sess.run(init_op)
coord = tf.train.Coordinator()
tf.train.start_queue_runners(sess, coord)
sess.run(supplyimages)
lock = threading.Lock()
threads = [threading.Thread(target=work, args=(coord, value, sess, epoch, incrementepoch, supplyimages, queue_size, lock, close_queue)) for i in range(200)]
for t in threads:
t.start()
coord.join(threads)
工作函数如下:
def work(coord, val, sess, epoch, incrementepoch, supplyimg, q, lock,\
close_op):
while not coord.should_stop():
if sess.run(q) > 0:
filename, currepoch = sess.run([val, epoch])
filename = filename.decode(encoding='UTF-8')
print(filename + ' ' + str(currepoch))
elif sess.run(epoch) < 2:
lock.acquire()
try:
if sess.run(q) == 0:
print("The previous epoch = %d"%(sess.run(epoch)))
sess.run([incrementepoch, supplyimg])
sz = sess.run(q)
print("The new epoch = %d"%(sess.run(epoch)))
print("The new queue size = %d"%(sz))
finally:
lock.release()
else:
try:
sess.run(close_op)
except tf.errors.CancelledError:
print('Queue already closed.')
coord.request_stop()
return None
所以,虽然这可行,但我觉得有更好、更简洁的方法来实现它。所以,简而言之,我的问题是:
- 在 TensorFlow 中是否有更简单、更清晰的方法来完成此任务?
- 这段代码逻辑有问题吗?我对多线程场景不是很有经验,所以任何没有引起我注意的明显错误都会对我很有帮助。
P.S : 看来这段代码终究不够完美。当我 运行 有 120 万张图像和 200 个线程时,它 运行。但是,当我 运行 它用于 10 个图像和 20 个线程时,它会出现以下错误:
CancelledError (see above for traceback): RandomShuffleQueue '_0_random_shuffle_queue' is closed.
[[Node: random_shuffle_queue_EnqueueMany = QueueEnqueueManyV2[Tcomponents=[DT_STRING], timeout_ms=-1, _device="/job:localhost/replica:0/task:0/cpu:0"](random_shuffle_queue, Const)]]
我以为 except tf.errors.CancelledError
涵盖了这些内容。这到底是怎么回事?
我建议让单个线程调用 enqueue_many 纪元时间将正确数量的图像排入队列。然后它可以关闭队列。这将使您简化工作函数和其他线程。
我终于找到了答案。问题是多个线程在 work()
函数的不同点上发生冲突。
以下 work()
函数完美运行。
def work(coord, val, sess, epoch, maxepochs, incrementepoch, supplyimg, q, lock, close_op):
print('I am thread number %s'%(threading.current_thread().name))
print('I can see a queue with size %d'%(sess.run(q)))
while not coord.should_stop():
lock.acquire()
if sess.run(q) > 0:
filename, currepoch = sess.run([val, epoch])
filename = filename.decode(encoding='UTF-8')
tid = threading.current_thread().name
print(filename + ' ' + str(currepoch) + ' thread ' + str(tid))
elif sess.run(epoch) < maxepochs:
print('Thread %s has acquired the lock'%(threading.current_thread().name))
print("The previous epoch = %d"%(sess.run(epoch)))
sess.run([incrementepoch, supplyimg])
sz = sess.run(q)
print("The new epoch = %d"%(sess.run(epoch)))
print("The new queue size = %d"%(sz))
else:
coord.request_stop()
lock.release()
return None
我认为 GIL 会阻止在这些线程中进行任何实际的并行处理。
要获得 tensorflow 的性能,您需要将数据保存在 tensorflow 中。
Tensor Flow 的 reading data guide 解释了如何解决一类非常相似的问题。
更具体地说,您似乎重写了 string_input_producer
.
的重要部分
我试图解决的问题如下:
我有一个文件名列表 trainimgs
。我定义了一个
tf.RandomShuffleQueue
及其capacity=len(trainimgs)
和min_after_dequeue=0
。- 此
tf.RandomShuffleQueue
预计将由trainimgs
填充指定的epochlimit
次。 - 许多线程应该并行工作。每个线程从
tf.RandomShuffleQueue
中取出一个元素并对其执行一些操作并将其排入另一个队列。我做对了那部分。 - 然而一旦
trainimgs
的1 epoch
被处理并且tf.RandomShuffleQueue
为空,前提是当前纪元e < epochlimit
,队列必须再次被填满并且线程必须再次工作。
好消息是:我已经在特定情况下使用它了(请参阅最后的 PS!!)
坏消息是:我认为有更好的方法。
我现在使用的方法如下(我简化了功能并删除了基于图像处理的预处理和后续入队,但处理的核心保持不变!!):
with tf.Session() as sess:
train_filename_queue = tf.RandomShuffleQueue(capacity=len(trainimgs), min_after_dequeue=0, dtypes=tf.string, seed=0)
queue_size = train_filename_queue.size()
trainimgtensor = tf.constant(trainimgs)
close_queue = train_filename_queue.close()
epoch = tf.Variable(initial_value=1, trainable=False, dtype=tf.int32)
incrementepoch = tf.assign(epoch, epoch + 1, use_locking=True)
supplyimages = train_filename_queue.enqueue_many(trainimgtensor)
value = train_filename_queue.dequeue()
init_op = tf.group(tf.global_variables_initializer(), tf.local_variables_initializer())
sess.run(init_op)
coord = tf.train.Coordinator()
tf.train.start_queue_runners(sess, coord)
sess.run(supplyimages)
lock = threading.Lock()
threads = [threading.Thread(target=work, args=(coord, value, sess, epoch, incrementepoch, supplyimages, queue_size, lock, close_queue)) for i in range(200)]
for t in threads:
t.start()
coord.join(threads)
工作函数如下:
def work(coord, val, sess, epoch, incrementepoch, supplyimg, q, lock,\
close_op):
while not coord.should_stop():
if sess.run(q) > 0:
filename, currepoch = sess.run([val, epoch])
filename = filename.decode(encoding='UTF-8')
print(filename + ' ' + str(currepoch))
elif sess.run(epoch) < 2:
lock.acquire()
try:
if sess.run(q) == 0:
print("The previous epoch = %d"%(sess.run(epoch)))
sess.run([incrementepoch, supplyimg])
sz = sess.run(q)
print("The new epoch = %d"%(sess.run(epoch)))
print("The new queue size = %d"%(sz))
finally:
lock.release()
else:
try:
sess.run(close_op)
except tf.errors.CancelledError:
print('Queue already closed.')
coord.request_stop()
return None
所以,虽然这可行,但我觉得有更好、更简洁的方法来实现它。所以,简而言之,我的问题是:
- 在 TensorFlow 中是否有更简单、更清晰的方法来完成此任务?
- 这段代码逻辑有问题吗?我对多线程场景不是很有经验,所以任何没有引起我注意的明显错误都会对我很有帮助。
P.S : 看来这段代码终究不够完美。当我 运行 有 120 万张图像和 200 个线程时,它 运行。但是,当我 运行 它用于 10 个图像和 20 个线程时,它会出现以下错误:
CancelledError (see above for traceback): RandomShuffleQueue '_0_random_shuffle_queue' is closed.
[[Node: random_shuffle_queue_EnqueueMany = QueueEnqueueManyV2[Tcomponents=[DT_STRING], timeout_ms=-1, _device="/job:localhost/replica:0/task:0/cpu:0"](random_shuffle_queue, Const)]]
我以为 except tf.errors.CancelledError
涵盖了这些内容。这到底是怎么回事?
我建议让单个线程调用 enqueue_many 纪元时间将正确数量的图像排入队列。然后它可以关闭队列。这将使您简化工作函数和其他线程。
我终于找到了答案。问题是多个线程在 work()
函数的不同点上发生冲突。
以下 work()
函数完美运行。
def work(coord, val, sess, epoch, maxepochs, incrementepoch, supplyimg, q, lock, close_op):
print('I am thread number %s'%(threading.current_thread().name))
print('I can see a queue with size %d'%(sess.run(q)))
while not coord.should_stop():
lock.acquire()
if sess.run(q) > 0:
filename, currepoch = sess.run([val, epoch])
filename = filename.decode(encoding='UTF-8')
tid = threading.current_thread().name
print(filename + ' ' + str(currepoch) + ' thread ' + str(tid))
elif sess.run(epoch) < maxepochs:
print('Thread %s has acquired the lock'%(threading.current_thread().name))
print("The previous epoch = %d"%(sess.run(epoch)))
sess.run([incrementepoch, supplyimg])
sz = sess.run(q)
print("The new epoch = %d"%(sess.run(epoch)))
print("The new queue size = %d"%(sz))
else:
coord.request_stop()
lock.release()
return None
我认为 GIL 会阻止在这些线程中进行任何实际的并行处理。
要获得 tensorflow 的性能,您需要将数据保存在 tensorflow 中。
Tensor Flow 的 reading data guide 解释了如何解决一类非常相似的问题。
更具体地说,您似乎重写了 string_input_producer
.