Tensorflow:通过协调器停止线程似乎不起作用?
Tensorflow: Stopping threads via Coordinator seems not to work?
在 之后,我通过一些额外的验证代码扩展了上述示例,请参阅附件代码。也就是说,每第 i 个训练步骤,学习模型都会在验证集(在我的例子中是几个)上进行评估。无法通过队列提供验证集,因此一个可能的想法是使用共享变量构建一个额外的推理图。
这不知何故有效,但训练完成后,程序挂起(在coord.join()
)并最终抛出异常:Coordinator stopped with threads still running:...
然后异步加载线程也抛出异常。 coordinator
异常可以通过 try/except
子句解决(见下面的代码),但异步线程仍然抛出异常(虽然这不会妨碍主程序,但我认为不应该发生---它有 while
循环,应该告诉它停止)。
有趣的是,如果在没有任何评估代码运行的情况下进行训练(即if (it+1)%stop == 0:
之后的块被注释掉),那么coord.join()
根本不会挂起。
我的问题:我在这里做错了什么?好像 .request_stop()
没有按照我希望的那样做?
import tensorflow as tf
import numpy as np
# some parameters
btsz = 100 # batch size
some_shape = 20 # size of one input (no of dims)
iters = 1000 # that many single training steps
ith = 10 # run validation sets every so often
# datastores (sort of complex backends, SQL like)
ds_train = ... # the one for training
ds_val1, ds_val2, ds_val3 = ... # having the validation data
def async_load(coord, session, queue, datastore,
tf_input, tf_target):
"""
Feed queue in async way. Inputs can be extracted
from datastore only one row at a time.
"""
while not coord.should_stop():
input = extract_one_input_as_numpy(datastore)
target = extract_numpy_from(datastore) # either 0 or 1
session.run(queue, feed_dict={tf_input: input, tf_target: target})
def evaluate(sess, datastore, tf_input, tf_target, tf_loss, btsz):
"""
Evaluate current model (represented as tf_loss) on a datastore.
"""
loss = []
for i in xrange(something):
input_batch = collect_btsz_many_single examples(datastore)
target_batch = same_for_targets(datastore)
tmp, = sess.run([tf_loss], feed_dict={tf_input:input_batch, tf_target:target_batch})
loss.append(tmp)
return np.mean(loss)
def log_reg(input, target, W, b):
"""
Simple logistic regression model.
"""
y = tf.matmul(input, W) + b
y_bin = tf.to_int32(y > 0)
t_bin = tf.to_int32(target > 0)
loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=y, targets=target))
correct_prediction = tf.equal(y_bin, t_bin)
accuracy = tf.reduce_mean(tf.cast(correct_prediction, "float"))
return y, loss, accuracy
with tf.Session() as sess:
# Placeholders to represent one input/target pair from a data store.
ds_inpt = tf.placeholder(tf.float32, shape=[some_shape])
ds_trgt = tf.placeholder(tf.float32, shape=[])
queue = tf.FIFOQueue(capacity=10000, dtypes=[tf.float32, tf.float32],
shapes=[[], [some_shape], shared_name="FIFO", name="FIFO")
# enqueuing, this will be used in the async loading.
enqueue_op = queue.enqueue([ds_trgt, ds_inpt])
# dequeue from queue q, with batch size btsz
q_trgt, q_inpt = queue.dequeue_many(btsz)
# Paramters for Logistic Regression
# two functions that build shared variables and initialize these
W = weight_variable([some_shape, 1])
b = bias_variable([1])
# training model, feed from dequeuing the async queue
y, loss, accuracy = log_reg(input=q_inpt, target=q_trgt, W=W, b=b)
train_step = tf.train.AdamOptimizer(learning_rate=0.001).minimize(loss)
# inputs for validation models
val_inpt = tf.placeholder(tf.float32, shape=[btsz, some_shape])
val_trgt = tf.placeholder(tf.float32, shape=[btsz])
# validation model
val_y, val_loss, val_accuracy = log_reg(input=val_inpt, target=val_trgt, W=W, b=b)
sess.run(tf.initialize_all_variables())
try:
coord = tf.train.Coordinator()
# Start a thread to enqueue data asynchronously, and hide I/O latency.
t = threading.Thread(target=async_load,
args=(coord, sess, enqueue_op, ds_train
ds_inpt, ds_trgt))
t.start()
# collect loss/accuracy for training
# and losses for validation/test sets.
tr_loss = []
tr_acc = []
v_loss = []
for it in xrange(iters):
_, _loss, _acc = sess.run([train_step, loss, accuracy])
tr_loss.append(_loss)
tr_acc.append(_acc)
if (it+1)%stop == 0:
# run trained model on validation set 1
tmp = evaluate(sess=sess, data=ds_val1,
tf_inpt=val_inpt, tf_trgt=val_trgt,
tf_loss=val_loss, btsz)
v_loss.append(tmp)
# run trained model on validation set 2
tmp = evaluate(sess=sess, data=ds_val2,
tf_inpt=val_inpt, tf_trgt=val_trgt,
tf_loss=val_loss, btsz)
v_loss.append(tmp)
# run trained model on validation set 3
tmp = evaluate(sess=sess, data=ds_val3,
tf_inpt=val_inpt, tf_trgt=val_trgt,
tf_loss=val_loss, btsz)
v_loss.append(tmp)
coord.request_stop()
coord.join([t])
except RuntimeError as rte:
print("Caught {}".format(rte))
# Clear everything!
tf.reset_default_graph()
您的代码中存在竞争条件。如果发生以下事件,线程 运行ning async_load()
将永远阻塞:
async_load()
调用 coord.should_stop()
其中 returns False
.
async_load()
调用 session.run(queue, ...)
但队列已满,因此调用无限期阻塞。
- 主线程调用
coord.request_stop()
.
- 主线程调用
coord.join([t])
,并且由于 (2) 而永远阻塞。
避免这种情况的一种方法是创建一个 queue.close(cancel_pending_enqueues=True)
操作,并在调用 coord.request_stop()
之前在主线程中 运行 它。这将解锁 async_load()
线程,并启用 coord.join([t])
到 return。
在
这不知何故有效,但训练完成后,程序挂起(在coord.join()
)并最终抛出异常:Coordinator stopped with threads still running:...
然后异步加载线程也抛出异常。 coordinator
异常可以通过 try/except
子句解决(见下面的代码),但异步线程仍然抛出异常(虽然这不会妨碍主程序,但我认为不应该发生---它有 while
循环,应该告诉它停止)。
有趣的是,如果在没有任何评估代码运行的情况下进行训练(即if (it+1)%stop == 0:
之后的块被注释掉),那么coord.join()
根本不会挂起。
我的问题:我在这里做错了什么?好像 .request_stop()
没有按照我希望的那样做?
import tensorflow as tf
import numpy as np
# some parameters
btsz = 100 # batch size
some_shape = 20 # size of one input (no of dims)
iters = 1000 # that many single training steps
ith = 10 # run validation sets every so often
# datastores (sort of complex backends, SQL like)
ds_train = ... # the one for training
ds_val1, ds_val2, ds_val3 = ... # having the validation data
def async_load(coord, session, queue, datastore,
tf_input, tf_target):
"""
Feed queue in async way. Inputs can be extracted
from datastore only one row at a time.
"""
while not coord.should_stop():
input = extract_one_input_as_numpy(datastore)
target = extract_numpy_from(datastore) # either 0 or 1
session.run(queue, feed_dict={tf_input: input, tf_target: target})
def evaluate(sess, datastore, tf_input, tf_target, tf_loss, btsz):
"""
Evaluate current model (represented as tf_loss) on a datastore.
"""
loss = []
for i in xrange(something):
input_batch = collect_btsz_many_single examples(datastore)
target_batch = same_for_targets(datastore)
tmp, = sess.run([tf_loss], feed_dict={tf_input:input_batch, tf_target:target_batch})
loss.append(tmp)
return np.mean(loss)
def log_reg(input, target, W, b):
"""
Simple logistic regression model.
"""
y = tf.matmul(input, W) + b
y_bin = tf.to_int32(y > 0)
t_bin = tf.to_int32(target > 0)
loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=y, targets=target))
correct_prediction = tf.equal(y_bin, t_bin)
accuracy = tf.reduce_mean(tf.cast(correct_prediction, "float"))
return y, loss, accuracy
with tf.Session() as sess:
# Placeholders to represent one input/target pair from a data store.
ds_inpt = tf.placeholder(tf.float32, shape=[some_shape])
ds_trgt = tf.placeholder(tf.float32, shape=[])
queue = tf.FIFOQueue(capacity=10000, dtypes=[tf.float32, tf.float32],
shapes=[[], [some_shape], shared_name="FIFO", name="FIFO")
# enqueuing, this will be used in the async loading.
enqueue_op = queue.enqueue([ds_trgt, ds_inpt])
# dequeue from queue q, with batch size btsz
q_trgt, q_inpt = queue.dequeue_many(btsz)
# Paramters for Logistic Regression
# two functions that build shared variables and initialize these
W = weight_variable([some_shape, 1])
b = bias_variable([1])
# training model, feed from dequeuing the async queue
y, loss, accuracy = log_reg(input=q_inpt, target=q_trgt, W=W, b=b)
train_step = tf.train.AdamOptimizer(learning_rate=0.001).minimize(loss)
# inputs for validation models
val_inpt = tf.placeholder(tf.float32, shape=[btsz, some_shape])
val_trgt = tf.placeholder(tf.float32, shape=[btsz])
# validation model
val_y, val_loss, val_accuracy = log_reg(input=val_inpt, target=val_trgt, W=W, b=b)
sess.run(tf.initialize_all_variables())
try:
coord = tf.train.Coordinator()
# Start a thread to enqueue data asynchronously, and hide I/O latency.
t = threading.Thread(target=async_load,
args=(coord, sess, enqueue_op, ds_train
ds_inpt, ds_trgt))
t.start()
# collect loss/accuracy for training
# and losses for validation/test sets.
tr_loss = []
tr_acc = []
v_loss = []
for it in xrange(iters):
_, _loss, _acc = sess.run([train_step, loss, accuracy])
tr_loss.append(_loss)
tr_acc.append(_acc)
if (it+1)%stop == 0:
# run trained model on validation set 1
tmp = evaluate(sess=sess, data=ds_val1,
tf_inpt=val_inpt, tf_trgt=val_trgt,
tf_loss=val_loss, btsz)
v_loss.append(tmp)
# run trained model on validation set 2
tmp = evaluate(sess=sess, data=ds_val2,
tf_inpt=val_inpt, tf_trgt=val_trgt,
tf_loss=val_loss, btsz)
v_loss.append(tmp)
# run trained model on validation set 3
tmp = evaluate(sess=sess, data=ds_val3,
tf_inpt=val_inpt, tf_trgt=val_trgt,
tf_loss=val_loss, btsz)
v_loss.append(tmp)
coord.request_stop()
coord.join([t])
except RuntimeError as rte:
print("Caught {}".format(rte))
# Clear everything!
tf.reset_default_graph()
您的代码中存在竞争条件。如果发生以下事件,线程 运行ning async_load()
将永远阻塞:
async_load()
调用coord.should_stop()
其中 returnsFalse
.async_load()
调用session.run(queue, ...)
但队列已满,因此调用无限期阻塞。- 主线程调用
coord.request_stop()
. - 主线程调用
coord.join([t])
,并且由于 (2) 而永远阻塞。
避免这种情况的一种方法是创建一个 queue.close(cancel_pending_enqueues=True)
操作,并在调用 coord.request_stop()
之前在主线程中 运行 它。这将解锁 async_load()
线程,并启用 coord.join([t])
到 return。