Tensorflow 队列未关闭。 tf.train.start_queue_runners(sess) 的问题
Tensor Flow queue not closing. Problems with tf.train.start_queue_runners(sess)
虽然 运行 是一个测试 CNN,但是当我尝试使用 sess.close()
关闭会话或请求脚趾协调器停止并收集线程时,我总是会收到此错误。显然,会话试图在仍有线程 运行 时关闭。我只是找不到阻止这种情况发生的方法。或者如果有 better/correct 在张量流中使用队列和线程的方法...
提前致谢!
总有一堆:
2017-10-24 15:48:02.625448: W C:\tf_jenkins\home\workspace\rel-win\M\windows-gpu\PY\tensorflow\core\kernels\queue_base.cc:295] _20_input_p
roducer/input_producer: Skipping cancelled enqueue attempt with queue not closed
其次是:
ERROR:tensorflow:Exception in QueueRunner: Enqueue operation was cancelled
[[Node: batch/fifo_queue_enqueue = QueueEnqueueV2[Tcomponents=[DT_FLOAT, DT_FLOAT], timeout_ms=-1, _device="/job:localhost/replica:0
/task:0/cpu:0"](batch/fifo_queue, Squeeze_1/_13, input_producer_1/Gather_1/_15)]]
Traceback (most recent call last):
File "<stdin>", line 30, in <module>
ERROR:tensorflow:Exception in QueueRunner: Enqueue operation was cancelled
[[Node: batch_1/fifo_queue_enqueue = QueueEnqueueV2[Tcomponents=[DT_FLOAT, DT_FLOAT], timeout_ms=-1, _device="/job:localhost/replica
:0/task:0/cpu:0"](batch_1/fifo_queue, Squeeze/_37, input_producer/Gather_1/_39)]]
ERROR:tensorflow:Exception in QueueRunner: Enqueue operation was cancelled
[[Node: batch_1/fifo_queue_enqueue = QueueEnqueueV2[Tcomponents=[DT_FLOAT, DT_FLOAT], timeout_ms=-1, _device="/job:localhost/replica
:0/task:0/cpu:0"](batch_1/fifo_queue, Squeeze/_37, input_producer/Gather_1/_39)]]
Exception in thread Thread-53:
Traceback (most recent call last):
File "C:\Program Files\Anaconda3\lib\threading.py", line 916, in _bootstrap_inner
self.run()
File "C:\Program Files\Anaconda3\lib\threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "C:\Program Files\Anaconda3\lib\site-packages\tensorflow\python\training\queue_runner_impl.py", line 238, in _run
enqueue_callable()
File "C:\Program Files\Anaconda3\lib\site-packages\tensorflow\python\client\session.py", line 1235, in _single_operation_run
target_list_as_strings, status, None)
File "C:\Program Files\Anaconda3\lib\contextlib.py", line 89, in __exit__
next(self.gen)
File "C:\Program Files\Anaconda3\lib\site-packages\tensorflow\python\framework\errors_impl.py", line 466, in raise_exception_on_not_ok_stat
us
下面是根据tf手册和GitHub:
的例子编写的代码
"""My general framework to construct a tensor flow data set of images for regression.
The genetal idea is to: create a list of image names (i.e the path to each image).
The image list must have also labels. In the case of a regression this can be multiple variables.
"""
import csv
import os
import sys
import plotly as py
import plotly.graph_objs as go
import math
import numpy as np
import tensorflow as tf
#First neet to get image paths and their respective labels
chn = 1
im_h = 424
im_w = 511
#resize image
size = 0.1
#size of test set
p = 0.25
def imtensors(im_path, chn, im_h, im_w, size):
im_h = int(im_h*size)
im_w = int(im_w*size)
ima_tensors = tf.read_file(im_path)
ima_tensors = tf.image.decode_png(ima_tensors, channels=chn)
ima_tensors = tf.image.resize_images(ima_tensors, [im_h, im_w])
return ima_tensors
dbname = 'simpRDB.csv'
imagepaths, y = list(), list()
#read the csv as a dictionary
with open(dbname, newline='') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
imagepaths.append(row['path'])
y.append(float(row['w']))
n = len(y)
ntest = int(n*p)
#remember that in py the index starts at 0 and x[a:d] -> a,b,c
impath_test = imagepaths[0:ntest]
y_test = y[0:ntest]
impath_train = imagepaths[ntest+1:n]
y_train = y[ntest+1:n]
#now convert to tensors
impath_test = tf.convert_to_tensor(impath_test, dtype=tf.string)
y_test = tf.convert_to_tensor(y_test, dtype=tf.float32)
im_test, y_test = tf.train.slice_input_producer([impath_test, y_test])
im_test = imtensors(im_test, chn, im_h, im_w, size)
impath_train = tf.convert_to_tensor(impath_train, dtype=tf.string)
y_train = tf.convert_to_tensor(y_train, dtype=tf.float32)
im_train, y_train = tf.train.slice_input_producer([impath_train, y_train])
im_train = imtensors(im_train, chn, im_h, im_w, size)
##################################################
# -----------------------------------------------
# This is a classic CNN with some spice.
# The basic change isat the output node. Instead of
# use a softmax or other multiclass we a re using a
# a fully regressor estimator to the last layer of
# nodes.
# -----------------------------------------------
# Parameters
learning_rate = 0.001
num_steps = 10000
b_size = 8
display_step = 100
# Network Parameters
dropout = 0.3 # rate to drop input
#create batched train set
#Use small batchs because CPU/GPU can run out of memory
X, Y = tf.train.batch([im_train, y_train], batch_size=b_size, capacity=b_size*4, num_threads=4,
allow_smaller_final_batch=True)
X_test, Y_test = tf.train.batch([im_test, y_test], batch_size=b_size, capacity=b_size*4,
num_threads=4, allow_smaller_final_batch=True)
#First lets define the weights and bias in a more sistematic fashion.
#The weights are going to be initialized as random weights wit values near zero. This is a good
#practice for neuralnets in general
inp_h = int(im_h*size)
inp_w = int(im_w*size)
#placeholders for model cheking
x = tf.placeholder(tf.float32, shape=[None, inp_h, inp_w, 1])
y_ = tf.placeholder(tf.float32, shape=[None, 1])
def weight_variable(shape):
initial = tf.truncated_normal(shape, stddev=0.1)
return tf.Variable(initial)
#The bias neurons are normaly initialized slightly positive for a ReLU activation function in order
# to prevent "dead neurons"
def bias_variable(shape):
initial = tf.constant(0.1, shape=shape)
return tf.Variable(initial)
#Now convolution layers: They are going to have size one with zero pad.
# The arguent strides deifne the size of the window. So the output is of the same size.
def conv2d(x, W):
return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='SAME')
#And the polling are going to be 2x2 blocks side by side.
def max_pool_2x2(x):
return tf.nn.max_pool(x, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')
#Define weights and biases
W_conv1 = weight_variable([5, 5, 1, 32])
b_conv1 = bias_variable([32])
W_conv2 = weight_variable([5, 5, 32, 64])
b_conv2 = bias_variable([64])
#after 2 max_poling 2x2 the 'image'size is reduce by 4. Need to use ceil (intrinsics of TF)
h_val = math.ceil(math.ceil(inp_h/2)/2)
w_val = math.ceil(math.ceil(inp_w/2)/2)
W_fc1 = weight_variable([h_val * w_val * 64, 1024])
b_fc1 = bias_variable([1024])
W_d1 = weight_variable([1024, 100])
b_d1 = bias_variable([100])
w_out = weight_variable([100, 1])
b_out = bias_variable([1])
# Create model
def conv_net(x, dropout, reuse, is_training):
# Define a scope for reusing the variables
with tf.variable_scope('ConvNet', reuse=reuse):
#no need for dropout if evaluating the model
# Convolution Layer with 32 filters and a kernel size of
conv1 = tf.nn.relu(conv2d(x, W_conv1) + b_conv1)
# Max Pooling (down-sampling) with strides of 2 and kernel size of 2
pool1 = max_pool_2x2(conv1)
#In order to build a deep network, we stack several layers of this type.
# The second layer will have 64 features for each 5x5 patch.
conv2 = tf.nn.relu(conv2d(pool1, W_conv2) + b_conv2)
# Max Pooling (down-sampling) with strides of 2 and kernel size of 2
pool2 = max_pool_2x2(conv2)
#DENSELY CONNECTED LAYER
#Now that the image size has been reduced to nxm, we add a fully-connected layer
# with 1024(32X32) neurons to allow processing on the entire image.
# Flatten the data to a 1-D vector for the fully connected layer
fc1 = tf.contrib.layers.flatten(pool2)
# Fully connected layer
fc1_flat = tf.nn.relu(tf.matmul(fc1, W_fc1) + b_fc1)
# Apply Dropout (if is_training is False, dropout is not applied)
fc1_drop = tf.layers.dropout(fc1_flat, rate=dropout, training=is_training)
# dense layer with size reduction for input to final prediction layer
d1 = tf.nn.relu(tf.matmul(fc1_drop, W_d1) + b_d1)
# Only one output aka regression
out = tf.matmul(d1, w_out)+b_out
return out
#Create graph for trainig and a graph for prediction sharing the same weights
out_train = conv_net(X, dropout, reuse=False, is_training=True)
#no drop out at evaluation
out_test = conv_net(X_test, dropout, reuse=True, is_training=False)
# Define loss and optimizer.
#reduce absulute sum of squares
loss_op = tf.reduce_mean(tf.abs(out_train - Y))
loss_ts = tf.reduce_mean(tf.abs(out_test - Y_test))
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate)
train_op = optimizer.minimize(loss_op)
#Save the loss for training and testing for plot latter
losses_op = []
losses_ts = []
steps = []
# Initialize the variables (i.e. assign their default value)
init = tf.global_variables_initializer()
# Saver object
saver = tf.train.Saver()
##################
# Start training
with tf.Session() as sess:
# Run the initializer
sess.run(init)
# Start the data queue
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess)
# Training cycle
try:
for step in range(1, num_steps+1):
if step % display_step == 0:
# Run optimization and calculate batch loss and accuracy
_, loss, loss2 = sess.run([train_op, loss_op, loss_ts])
print("Step " + str(step) + ", Minibatch Loss = " + \
"{:.4f}".format(loss) + ", Loss Testbatch = " + "{:.4f}".format(loss2))
steps.append(step)
losses_op.append(loss)
losses_ts.append(loss2)
else:
# Only run the optimization op (backprop)
sess.run(train_op)
print("Optimization Finished!")
except Exception as e:
coord.request_stop(e)
finally:
#Something about it is safer to call twice
coord.request_stop()
coord.join(threads)
saver.save(sess, model_path)
#plota of loss over steps
p_loss1 = go.Scatter(
x=steps,
y=losses_op,
mode='lines',
name='Loss training minibatch'
)
p_loss2 = go.Scatter(
x=steps,
y=losses_ts,
mode='lines',
name='Loss evaluation minibatch'
)
data = [p_loss1, p_loss2]
# Save your model
#Loss of final model on the test set
#no dropout for the test set
m_name = 'bw_model_1'
save_path = os.path.join(os.getcwd(), 'bw_models')
if not os.path.exists(save_path):
os.mkdir(save_path)
model_path = os.path.join(save_path, m_name)
#save a graph of the loss over time
py.offline.plot(data, filename=model_path)
#end
对不起我的愚蠢!哈哈
发现一个大错误,问题是我创建了协调器但没有在队列运行器中调用它。是这样的:
# Start the data queue
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess)
应该是这样的:
# Start the data queue
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess, coord)
现在代码正在停止线程并在最后收集它!
虽然 运行 是一个测试 CNN,但是当我尝试使用 sess.close()
关闭会话或请求脚趾协调器停止并收集线程时,我总是会收到此错误。显然,会话试图在仍有线程 运行 时关闭。我只是找不到阻止这种情况发生的方法。或者如果有 better/correct 在张量流中使用队列和线程的方法...
提前致谢!
总有一堆:
2017-10-24 15:48:02.625448: W C:\tf_jenkins\home\workspace\rel-win\M\windows-gpu\PY\tensorflow\core\kernels\queue_base.cc:295] _20_input_p
roducer/input_producer: Skipping cancelled enqueue attempt with queue not closed
其次是:
ERROR:tensorflow:Exception in QueueRunner: Enqueue operation was cancelled
[[Node: batch/fifo_queue_enqueue = QueueEnqueueV2[Tcomponents=[DT_FLOAT, DT_FLOAT], timeout_ms=-1, _device="/job:localhost/replica:0
/task:0/cpu:0"](batch/fifo_queue, Squeeze_1/_13, input_producer_1/Gather_1/_15)]]
Traceback (most recent call last):
File "<stdin>", line 30, in <module>
ERROR:tensorflow:Exception in QueueRunner: Enqueue operation was cancelled
[[Node: batch_1/fifo_queue_enqueue = QueueEnqueueV2[Tcomponents=[DT_FLOAT, DT_FLOAT], timeout_ms=-1, _device="/job:localhost/replica
:0/task:0/cpu:0"](batch_1/fifo_queue, Squeeze/_37, input_producer/Gather_1/_39)]]
ERROR:tensorflow:Exception in QueueRunner: Enqueue operation was cancelled
[[Node: batch_1/fifo_queue_enqueue = QueueEnqueueV2[Tcomponents=[DT_FLOAT, DT_FLOAT], timeout_ms=-1, _device="/job:localhost/replica
:0/task:0/cpu:0"](batch_1/fifo_queue, Squeeze/_37, input_producer/Gather_1/_39)]]
Exception in thread Thread-53:
Traceback (most recent call last):
File "C:\Program Files\Anaconda3\lib\threading.py", line 916, in _bootstrap_inner
self.run()
File "C:\Program Files\Anaconda3\lib\threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "C:\Program Files\Anaconda3\lib\site-packages\tensorflow\python\training\queue_runner_impl.py", line 238, in _run
enqueue_callable()
File "C:\Program Files\Anaconda3\lib\site-packages\tensorflow\python\client\session.py", line 1235, in _single_operation_run
target_list_as_strings, status, None)
File "C:\Program Files\Anaconda3\lib\contextlib.py", line 89, in __exit__
next(self.gen)
File "C:\Program Files\Anaconda3\lib\site-packages\tensorflow\python\framework\errors_impl.py", line 466, in raise_exception_on_not_ok_stat
us
下面是根据tf手册和GitHub:
的例子编写的代码"""My general framework to construct a tensor flow data set of images for regression.
The genetal idea is to: create a list of image names (i.e the path to each image).
The image list must have also labels. In the case of a regression this can be multiple variables.
"""
import csv
import os
import sys
import plotly as py
import plotly.graph_objs as go
import math
import numpy as np
import tensorflow as tf
#First neet to get image paths and their respective labels
chn = 1
im_h = 424
im_w = 511
#resize image
size = 0.1
#size of test set
p = 0.25
def imtensors(im_path, chn, im_h, im_w, size):
im_h = int(im_h*size)
im_w = int(im_w*size)
ima_tensors = tf.read_file(im_path)
ima_tensors = tf.image.decode_png(ima_tensors, channels=chn)
ima_tensors = tf.image.resize_images(ima_tensors, [im_h, im_w])
return ima_tensors
dbname = 'simpRDB.csv'
imagepaths, y = list(), list()
#read the csv as a dictionary
with open(dbname, newline='') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
imagepaths.append(row['path'])
y.append(float(row['w']))
n = len(y)
ntest = int(n*p)
#remember that in py the index starts at 0 and x[a:d] -> a,b,c
impath_test = imagepaths[0:ntest]
y_test = y[0:ntest]
impath_train = imagepaths[ntest+1:n]
y_train = y[ntest+1:n]
#now convert to tensors
impath_test = tf.convert_to_tensor(impath_test, dtype=tf.string)
y_test = tf.convert_to_tensor(y_test, dtype=tf.float32)
im_test, y_test = tf.train.slice_input_producer([impath_test, y_test])
im_test = imtensors(im_test, chn, im_h, im_w, size)
impath_train = tf.convert_to_tensor(impath_train, dtype=tf.string)
y_train = tf.convert_to_tensor(y_train, dtype=tf.float32)
im_train, y_train = tf.train.slice_input_producer([impath_train, y_train])
im_train = imtensors(im_train, chn, im_h, im_w, size)
##################################################
# -----------------------------------------------
# This is a classic CNN with some spice.
# The basic change isat the output node. Instead of
# use a softmax or other multiclass we a re using a
# a fully regressor estimator to the last layer of
# nodes.
# -----------------------------------------------
# Parameters
learning_rate = 0.001
num_steps = 10000
b_size = 8
display_step = 100
# Network Parameters
dropout = 0.3 # rate to drop input
#create batched train set
#Use small batchs because CPU/GPU can run out of memory
X, Y = tf.train.batch([im_train, y_train], batch_size=b_size, capacity=b_size*4, num_threads=4,
allow_smaller_final_batch=True)
X_test, Y_test = tf.train.batch([im_test, y_test], batch_size=b_size, capacity=b_size*4,
num_threads=4, allow_smaller_final_batch=True)
#First lets define the weights and bias in a more sistematic fashion.
#The weights are going to be initialized as random weights wit values near zero. This is a good
#practice for neuralnets in general
inp_h = int(im_h*size)
inp_w = int(im_w*size)
#placeholders for model cheking
x = tf.placeholder(tf.float32, shape=[None, inp_h, inp_w, 1])
y_ = tf.placeholder(tf.float32, shape=[None, 1])
def weight_variable(shape):
initial = tf.truncated_normal(shape, stddev=0.1)
return tf.Variable(initial)
#The bias neurons are normaly initialized slightly positive for a ReLU activation function in order
# to prevent "dead neurons"
def bias_variable(shape):
initial = tf.constant(0.1, shape=shape)
return tf.Variable(initial)
#Now convolution layers: They are going to have size one with zero pad.
# The arguent strides deifne the size of the window. So the output is of the same size.
def conv2d(x, W):
return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='SAME')
#And the polling are going to be 2x2 blocks side by side.
def max_pool_2x2(x):
return tf.nn.max_pool(x, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')
#Define weights and biases
W_conv1 = weight_variable([5, 5, 1, 32])
b_conv1 = bias_variable([32])
W_conv2 = weight_variable([5, 5, 32, 64])
b_conv2 = bias_variable([64])
#after 2 max_poling 2x2 the 'image'size is reduce by 4. Need to use ceil (intrinsics of TF)
h_val = math.ceil(math.ceil(inp_h/2)/2)
w_val = math.ceil(math.ceil(inp_w/2)/2)
W_fc1 = weight_variable([h_val * w_val * 64, 1024])
b_fc1 = bias_variable([1024])
W_d1 = weight_variable([1024, 100])
b_d1 = bias_variable([100])
w_out = weight_variable([100, 1])
b_out = bias_variable([1])
# Create model
def conv_net(x, dropout, reuse, is_training):
# Define a scope for reusing the variables
with tf.variable_scope('ConvNet', reuse=reuse):
#no need for dropout if evaluating the model
# Convolution Layer with 32 filters and a kernel size of
conv1 = tf.nn.relu(conv2d(x, W_conv1) + b_conv1)
# Max Pooling (down-sampling) with strides of 2 and kernel size of 2
pool1 = max_pool_2x2(conv1)
#In order to build a deep network, we stack several layers of this type.
# The second layer will have 64 features for each 5x5 patch.
conv2 = tf.nn.relu(conv2d(pool1, W_conv2) + b_conv2)
# Max Pooling (down-sampling) with strides of 2 and kernel size of 2
pool2 = max_pool_2x2(conv2)
#DENSELY CONNECTED LAYER
#Now that the image size has been reduced to nxm, we add a fully-connected layer
# with 1024(32X32) neurons to allow processing on the entire image.
# Flatten the data to a 1-D vector for the fully connected layer
fc1 = tf.contrib.layers.flatten(pool2)
# Fully connected layer
fc1_flat = tf.nn.relu(tf.matmul(fc1, W_fc1) + b_fc1)
# Apply Dropout (if is_training is False, dropout is not applied)
fc1_drop = tf.layers.dropout(fc1_flat, rate=dropout, training=is_training)
# dense layer with size reduction for input to final prediction layer
d1 = tf.nn.relu(tf.matmul(fc1_drop, W_d1) + b_d1)
# Only one output aka regression
out = tf.matmul(d1, w_out)+b_out
return out
#Create graph for trainig and a graph for prediction sharing the same weights
out_train = conv_net(X, dropout, reuse=False, is_training=True)
#no drop out at evaluation
out_test = conv_net(X_test, dropout, reuse=True, is_training=False)
# Define loss and optimizer.
#reduce absulute sum of squares
loss_op = tf.reduce_mean(tf.abs(out_train - Y))
loss_ts = tf.reduce_mean(tf.abs(out_test - Y_test))
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate)
train_op = optimizer.minimize(loss_op)
#Save the loss for training and testing for plot latter
losses_op = []
losses_ts = []
steps = []
# Initialize the variables (i.e. assign their default value)
init = tf.global_variables_initializer()
# Saver object
saver = tf.train.Saver()
##################
# Start training
with tf.Session() as sess:
# Run the initializer
sess.run(init)
# Start the data queue
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess)
# Training cycle
try:
for step in range(1, num_steps+1):
if step % display_step == 0:
# Run optimization and calculate batch loss and accuracy
_, loss, loss2 = sess.run([train_op, loss_op, loss_ts])
print("Step " + str(step) + ", Minibatch Loss = " + \
"{:.4f}".format(loss) + ", Loss Testbatch = " + "{:.4f}".format(loss2))
steps.append(step)
losses_op.append(loss)
losses_ts.append(loss2)
else:
# Only run the optimization op (backprop)
sess.run(train_op)
print("Optimization Finished!")
except Exception as e:
coord.request_stop(e)
finally:
#Something about it is safer to call twice
coord.request_stop()
coord.join(threads)
saver.save(sess, model_path)
#plota of loss over steps
p_loss1 = go.Scatter(
x=steps,
y=losses_op,
mode='lines',
name='Loss training minibatch'
)
p_loss2 = go.Scatter(
x=steps,
y=losses_ts,
mode='lines',
name='Loss evaluation minibatch'
)
data = [p_loss1, p_loss2]
# Save your model
#Loss of final model on the test set
#no dropout for the test set
m_name = 'bw_model_1'
save_path = os.path.join(os.getcwd(), 'bw_models')
if not os.path.exists(save_path):
os.mkdir(save_path)
model_path = os.path.join(save_path, m_name)
#save a graph of the loss over time
py.offline.plot(data, filename=model_path)
#end
对不起我的愚蠢!哈哈
发现一个大错误,问题是我创建了协调器但没有在队列运行器中调用它。是这样的:
# Start the data queue
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess)
应该是这样的:
# Start the data queue
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess, coord)
现在代码正在停止线程并在最后收集它!