链接多个队列时的 Tensorflow 竞争条件
Tensorflow Race conditions when chaining multiple queues
我想以多线程方式计算一组图像的每个 RGB 通道的平均值。
我的想法是让一个 string_input_producer
填充一个 filename_queue
,然后让第二个 FIFOQueue
由 num_threads
线程填充,这些线程从文件名加载图像在 filename_queue
中,对它们执行一些操作,然后将结果排队。
第二个队列然后由一个线程(主线程)访问,该线程汇总队列中的所有值。
这是我的代码:
# variables for storing the mean and some intermediate results
mean = tf.Variable([0.0, 0.0, 0.0])
total = tf.Variable(0.0)
# the filename queue and the ops to read from it
filename_queue = tf.train.string_input_producer(filenames, num_epochs=1)
reader = tf.WholeFileReader()
_, value = reader.read(filename_queue)
image = tf.image.decode_jpeg(value, channels=3)
image = tf.cast(image, tf.float32)
sum = tf.reduce_sum(image, [0, 1])
num = tf.mul(tf.shape(image)[0], tf.shape(image)[1])
num = tf.cast(num, tf.float32)
# the second queue and its enqueue op
queue = tf.FIFOQueue(1000, dtypes=[tf.float32, tf.float32], shapes=[[3], []])
enqueue_op = queue.enqueue([sum, num])
# the ops performed by the main thread
img_sum, img_num = queue.dequeue()
mean_op = tf.add(mean, img_sum)
total_op = tf.add(total, img_num)
# adding new queue runner that performs enqueue_op on num_threads threads
qr = tf.train.QueueRunner(queue, [enqueue_op] * num_threads)
tf.train.add_queue_runner(qr)
init_op = tf.initialize_all_variables()
sess = tf.Session()
sess.run(init_op)
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
# the main loop being executed until the OutOfRangeError
# (when filename_queue does not yield elements anymore)
try:
while not coord.should_stop():
mean, total = sess.run([mean_op, total_op])
except tf.errors.OutOfRangeError:
print 'All images processed.'
finally:
coord.request_stop()
coord.join(threads)
# some additional computations to get the mean
total_3channel = tf.pack([total, total, total])
mean = tf.div(mean, total_3channel)
mean = sess.run(mean)
print mean
问题是我每次 运行 这个函数都会得到不同的结果,例如:
[ 99.35347748 58.35261154 44.56705856]
[ 95.91153717 92.54192352 87.48269653]
[ 124.991745 121.83417511 121.1891861 ]
我将此归咎于竞争条件。但是这些竞争条件从何而来?有人可以帮我吗?
您的 QueueRunner
将启动 num_threads
个线程,这些线程将竞相访问您的 reader
并将结果推送到队列中。 queue
上的图像顺序将根据先完成的线程而有所不同。
2 月 12 日更新
链接两个队列并从第二个队列中求和值的简单示例。当使用 num_threads
> 1 时,中间值有一些不确定性,但最终值总是 30
。当num_threads=1
时,一切都是确定性的
tf.reset_default_graph()
queue_dtype = np.int32
# values_queue is a queue that will be filled with 0,1,2,3,4
# range_input_producer creates the queue and registers its queue_runner
value_queue = tf.range_input_producer(limit=5, num_epochs=1, shuffle=False)
value = value_queue.dequeue()
# value_squared_queue will be filled with 0,1,4,9,16
value_squared_queue = tf.FIFOQueue(capacity=50, dtypes=queue_dtype)
value_squared_enqueue = value_squared_queue.enqueue(tf.square(value))
value_squared = value_squared_queue.dequeue()
# value_squared_sum keeps running sum of squares of values
value_squared_sum = tf.Variable(0)
value_squared_sum_update = value_squared_sum.assign_add(value_squared)
# register queue_runner in the global queue runners collection
num_threads = 2
qr = tf.train.QueueRunner(value_squared_queue, [value_squared_enqueue] * num_threads)
tf.train.queue_runner.add_queue_runner(qr)
sess = tf.InteractiveSession()
sess.run(tf.initialize_all_variables())
tf.start_queue_runners()
for i in range(5):
sess.run([value_squared_sum_update])
print sess.run([value_squared_sum])
你应该看到:
[0]
[1]
[5]
[14]
[30]
或者有时(当前 2 个值的顺序翻转时)
[1]
[1]
[5]
[14]
[30]
我想以多线程方式计算一组图像的每个 RGB 通道的平均值。
我的想法是让一个 string_input_producer
填充一个 filename_queue
,然后让第二个 FIFOQueue
由 num_threads
线程填充,这些线程从文件名加载图像在 filename_queue
中,对它们执行一些操作,然后将结果排队。
第二个队列然后由一个线程(主线程)访问,该线程汇总队列中的所有值。
这是我的代码:
# variables for storing the mean and some intermediate results
mean = tf.Variable([0.0, 0.0, 0.0])
total = tf.Variable(0.0)
# the filename queue and the ops to read from it
filename_queue = tf.train.string_input_producer(filenames, num_epochs=1)
reader = tf.WholeFileReader()
_, value = reader.read(filename_queue)
image = tf.image.decode_jpeg(value, channels=3)
image = tf.cast(image, tf.float32)
sum = tf.reduce_sum(image, [0, 1])
num = tf.mul(tf.shape(image)[0], tf.shape(image)[1])
num = tf.cast(num, tf.float32)
# the second queue and its enqueue op
queue = tf.FIFOQueue(1000, dtypes=[tf.float32, tf.float32], shapes=[[3], []])
enqueue_op = queue.enqueue([sum, num])
# the ops performed by the main thread
img_sum, img_num = queue.dequeue()
mean_op = tf.add(mean, img_sum)
total_op = tf.add(total, img_num)
# adding new queue runner that performs enqueue_op on num_threads threads
qr = tf.train.QueueRunner(queue, [enqueue_op] * num_threads)
tf.train.add_queue_runner(qr)
init_op = tf.initialize_all_variables()
sess = tf.Session()
sess.run(init_op)
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
# the main loop being executed until the OutOfRangeError
# (when filename_queue does not yield elements anymore)
try:
while not coord.should_stop():
mean, total = sess.run([mean_op, total_op])
except tf.errors.OutOfRangeError:
print 'All images processed.'
finally:
coord.request_stop()
coord.join(threads)
# some additional computations to get the mean
total_3channel = tf.pack([total, total, total])
mean = tf.div(mean, total_3channel)
mean = sess.run(mean)
print mean
问题是我每次 运行 这个函数都会得到不同的结果,例如:
[ 99.35347748 58.35261154 44.56705856]
[ 95.91153717 92.54192352 87.48269653]
[ 124.991745 121.83417511 121.1891861 ]
我将此归咎于竞争条件。但是这些竞争条件从何而来?有人可以帮我吗?
您的 QueueRunner
将启动 num_threads
个线程,这些线程将竞相访问您的 reader
并将结果推送到队列中。 queue
上的图像顺序将根据先完成的线程而有所不同。
2 月 12 日更新
链接两个队列并从第二个队列中求和值的简单示例。当使用 num_threads
> 1 时,中间值有一些不确定性,但最终值总是 30
。当num_threads=1
时,一切都是确定性的
tf.reset_default_graph()
queue_dtype = np.int32
# values_queue is a queue that will be filled with 0,1,2,3,4
# range_input_producer creates the queue and registers its queue_runner
value_queue = tf.range_input_producer(limit=5, num_epochs=1, shuffle=False)
value = value_queue.dequeue()
# value_squared_queue will be filled with 0,1,4,9,16
value_squared_queue = tf.FIFOQueue(capacity=50, dtypes=queue_dtype)
value_squared_enqueue = value_squared_queue.enqueue(tf.square(value))
value_squared = value_squared_queue.dequeue()
# value_squared_sum keeps running sum of squares of values
value_squared_sum = tf.Variable(0)
value_squared_sum_update = value_squared_sum.assign_add(value_squared)
# register queue_runner in the global queue runners collection
num_threads = 2
qr = tf.train.QueueRunner(value_squared_queue, [value_squared_enqueue] * num_threads)
tf.train.queue_runner.add_queue_runner(qr)
sess = tf.InteractiveSession()
sess.run(tf.initialize_all_variables())
tf.start_queue_runners()
for i in range(5):
sess.run([value_squared_sum_update])
print sess.run([value_squared_sum])
你应该看到:
[0]
[1]
[5]
[14]
[30]
或者有时(当前 2 个值的顺序翻转时)
[1]
[1]
[5]
[14]
[30]