Tensorflow 队列未关闭。 tf.train.start_queue_runners(sess) 的问题

Tensor Flow queue not closing. Problems with tf.train.start_queue_runners(sess)

虽然 运行 是一个测试 CNN,但是当我尝试使用 sess.close() 关闭会话或请求脚趾协调器停止并收集线程时,我总是会收到此错误。显然,会话试图在仍有线程 运行 时关闭。我只是找不到阻止这种情况发生的方法。或者如果有 better/correct 在张量流中使用队列和线程的方法...

提前致谢!

总有一堆:

2017-10-24 15:48:02.625448: W C:\tf_jenkins\home\workspace\rel-win\M\windows-gpu\PY\tensorflow\core\kernels\queue_base.cc:295] _20_input_p
roducer/input_producer: Skipping cancelled enqueue attempt with queue not closed

其次是:

ERROR:tensorflow:Exception in QueueRunner: Enqueue operation was cancelled
         [[Node: batch/fifo_queue_enqueue = QueueEnqueueV2[Tcomponents=[DT_FLOAT, DT_FLOAT], timeout_ms=-1, _device="/job:localhost/replica:0
/task:0/cpu:0"](batch/fifo_queue, Squeeze_1/_13, input_producer_1/Gather_1/_15)]]
Traceback (most recent call last):
  File "<stdin>", line 30, in <module>
ERROR:tensorflow:Exception in QueueRunner: Enqueue operation was cancelled
         [[Node: batch_1/fifo_queue_enqueue = QueueEnqueueV2[Tcomponents=[DT_FLOAT, DT_FLOAT], timeout_ms=-1, _device="/job:localhost/replica
:0/task:0/cpu:0"](batch_1/fifo_queue, Squeeze/_37, input_producer/Gather_1/_39)]]
ERROR:tensorflow:Exception in QueueRunner: Enqueue operation was cancelled
         [[Node: batch_1/fifo_queue_enqueue = QueueEnqueueV2[Tcomponents=[DT_FLOAT, DT_FLOAT], timeout_ms=-1, _device="/job:localhost/replica
:0/task:0/cpu:0"](batch_1/fifo_queue, Squeeze/_37, input_producer/Gather_1/_39)]]
Exception in thread Thread-53:
Traceback (most recent call last):
  File "C:\Program Files\Anaconda3\lib\threading.py", line 916, in _bootstrap_inner
    self.run()
  File "C:\Program Files\Anaconda3\lib\threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Program Files\Anaconda3\lib\site-packages\tensorflow\python\training\queue_runner_impl.py", line 238, in _run
    enqueue_callable()
  File "C:\Program Files\Anaconda3\lib\site-packages\tensorflow\python\client\session.py", line 1235, in _single_operation_run
    target_list_as_strings, status, None)
  File "C:\Program Files\Anaconda3\lib\contextlib.py", line 89, in __exit__
    next(self.gen)
  File "C:\Program Files\Anaconda3\lib\site-packages\tensorflow\python\framework\errors_impl.py", line 466, in raise_exception_on_not_ok_stat
us

下面是根据tf手册和GitHub:

的例子编写的代码
"""My general framework to construct a tensor flow data set of images for regression.

The genetal idea is to: create a list of image names (i.e the path to each image).
The image list must have also labels. In the case of a regression this can be multiple variables.
"""
import csv
import os
import sys
import plotly as py
import plotly.graph_objs as go
import math
import numpy as np
import tensorflow as tf
#First neet to get image paths and their respective labels
chn = 1
im_h = 424
im_w = 511
#resize image
size = 0.1
#size of test set
p = 0.25

def imtensors(im_path, chn, im_h, im_w, size):
    im_h = int(im_h*size)
    im_w = int(im_w*size)
    ima_tensors = tf.read_file(im_path)
    ima_tensors = tf.image.decode_png(ima_tensors, channels=chn)
    ima_tensors = tf.image.resize_images(ima_tensors, [im_h, im_w])
    return ima_tensors


dbname = 'simpRDB.csv'
imagepaths, y = list(), list()
#read the csv as a dictionary
with open(dbname, newline='') as csvfile:
    reader = csv.DictReader(csvfile)
    for row in reader:
        imagepaths.append(row['path'])
        y.append(float(row['w']))

n = len(y)
ntest = int(n*p)
#remember that in py the index starts at 0 and  x[a:d] -> a,b,c
impath_test = imagepaths[0:ntest]
y_test = y[0:ntest]
impath_train = imagepaths[ntest+1:n]
y_train = y[ntest+1:n]

#now convert to tensors
impath_test = tf.convert_to_tensor(impath_test, dtype=tf.string)
y_test = tf.convert_to_tensor(y_test, dtype=tf.float32)
im_test, y_test = tf.train.slice_input_producer([impath_test, y_test])
im_test = imtensors(im_test, chn, im_h, im_w, size)

impath_train = tf.convert_to_tensor(impath_train, dtype=tf.string)
y_train = tf.convert_to_tensor(y_train, dtype=tf.float32)
im_train, y_train = tf.train.slice_input_producer([impath_train, y_train])
im_train = imtensors(im_train, chn, im_h, im_w, size)


##################################################
# -----------------------------------------------
# This is a classic CNN with some spice.
# The basic change isat the output node. Instead of
# use a softmax or other multiclass we a re using a
# a fully regressor estimator to the last layer of
# nodes.
# -----------------------------------------------

# Parameters
learning_rate = 0.001
num_steps = 10000
b_size = 8
display_step = 100

# Network Parameters
dropout = 0.3 # rate to drop input

#create batched train set
#Use small batchs because CPU/GPU can run out of memory
X, Y = tf.train.batch([im_train, y_train], batch_size=b_size, capacity=b_size*4, num_threads=4,
                      allow_smaller_final_batch=True)

X_test, Y_test = tf.train.batch([im_test, y_test], batch_size=b_size, capacity=b_size*4,
                                num_threads=4, allow_smaller_final_batch=True)

#First lets define the weights and bias in a more sistematic fashion.
#The weights are going to be initialized as random weights wit values near zero. This is a good
#practice for neuralnets in general
inp_h = int(im_h*size)
inp_w = int(im_w*size)

#placeholders for model cheking
x = tf.placeholder(tf.float32, shape=[None, inp_h, inp_w, 1])
y_ = tf.placeholder(tf.float32, shape=[None, 1])

def weight_variable(shape):
    initial = tf.truncated_normal(shape, stddev=0.1)
    return tf.Variable(initial)

#The bias neurons are normaly initialized slightly positive for a ReLU activation function in order
#   to prevent "dead neurons"
def bias_variable(shape):
    initial = tf.constant(0.1, shape=shape)
    return tf.Variable(initial)

#Now convolution layers: They are going to have size one with zero pad.
# The arguent strides deifne the size of the window. So the output is of the same size.
def conv2d(x, W):
    return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='SAME')

#And the polling are going to be 2x2 blocks side by side.
def max_pool_2x2(x):
    return tf.nn.max_pool(x, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')

#Define weights and biases
W_conv1 = weight_variable([5, 5, 1, 32])
b_conv1 = bias_variable([32])

W_conv2 = weight_variable([5, 5, 32, 64])
b_conv2 = bias_variable([64])

#after 2 max_poling 2x2 the 'image'size is reduce by 4. Need to use ceil (intrinsics of TF)
h_val = math.ceil(math.ceil(inp_h/2)/2)
w_val = math.ceil(math.ceil(inp_w/2)/2)
W_fc1 = weight_variable([h_val * w_val * 64, 1024])
b_fc1 = bias_variable([1024])

W_d1 = weight_variable([1024, 100])
b_d1 = bias_variable([100])

w_out = weight_variable([100, 1])
b_out = bias_variable([1])

# Create model
def conv_net(x, dropout, reuse, is_training):
    # Define a scope for reusing the variables
    with tf.variable_scope('ConvNet', reuse=reuse):
        #no need for dropout if evaluating the model
        # Convolution Layer with 32 filters and a kernel size of
        conv1 = tf.nn.relu(conv2d(x, W_conv1) + b_conv1)
        # Max Pooling (down-sampling) with strides of 2 and kernel size of 2
        pool1 = max_pool_2x2(conv1)
        #In order to build a deep network, we stack several layers of this type.
        #  The second layer will have 64 features for each 5x5 patch.
        conv2 = tf.nn.relu(conv2d(pool1, W_conv2) + b_conv2)
        # Max Pooling (down-sampling) with strides of 2 and kernel size of 2
        pool2 = max_pool_2x2(conv2)
        #DENSELY CONNECTED LAYER
        #Now that the image size has been reduced to nxm, we add a fully-connected layer
        #  with 1024(32X32) neurons to allow processing on the entire image.
        # Flatten the data to a 1-D vector for the fully connected layer
        fc1 = tf.contrib.layers.flatten(pool2)
        # Fully connected layer
        fc1_flat = tf.nn.relu(tf.matmul(fc1, W_fc1) + b_fc1)
        # Apply Dropout (if is_training is False, dropout is not applied)
        fc1_drop = tf.layers.dropout(fc1_flat, rate=dropout, training=is_training)
        #  dense layer with size reduction for input to final prediction layer
        d1 = tf.nn.relu(tf.matmul(fc1_drop, W_d1) + b_d1)
        # Only one output aka regression
        out = tf.matmul(d1, w_out)+b_out
    return out

#Create graph for trainig and a graph for prediction sharing the same weights
out_train = conv_net(X, dropout, reuse=False, is_training=True)
#no drop out at evaluation
out_test = conv_net(X_test, dropout, reuse=True, is_training=False)

# Define loss and optimizer.
#reduce absulute sum of squares
loss_op = tf.reduce_mean(tf.abs(out_train - Y))
loss_ts = tf.reduce_mean(tf.abs(out_test - Y_test))
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate)
train_op = optimizer.minimize(loss_op)
#Save the loss for training and testing for plot latter
losses_op = []
losses_ts = []
steps = []
# Initialize the variables (i.e. assign their default value)
init = tf.global_variables_initializer()
# Saver object
saver = tf.train.Saver()
##################
# Start training
with tf.Session() as sess:
    # Run the initializer
    sess.run(init)
    # Start the data queue
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess)
    # Training cycle
    try:
        for step in range(1, num_steps+1):
            if step % display_step == 0:
                # Run optimization and calculate batch loss and accuracy
                _, loss, loss2 = sess.run([train_op, loss_op, loss_ts])
                print("Step " + str(step) + ", Minibatch Loss = " + \
                "{:.4f}".format(loss) + ", Loss Testbatch = " + "{:.4f}".format(loss2))
                steps.append(step)
                losses_op.append(loss)
                losses_ts.append(loss2)
            else:
                # Only run the optimization op (backprop)
                sess.run(train_op)
        print("Optimization Finished!")
    except Exception as e:
        coord.request_stop(e)
    finally:
        #Something about it is safer to call twice
        coord.request_stop()
        coord.join(threads)
        saver.save(sess, model_path)

#plota of loss over steps
p_loss1 = go.Scatter(
    x=steps,
    y=losses_op,
    mode='lines',
    name='Loss training minibatch'
)
p_loss2 = go.Scatter(
    x=steps,
    y=losses_ts,
    mode='lines',
    name='Loss evaluation minibatch'
)
data = [p_loss1, p_loss2]

# Save your model
#Loss of final model on the test set
#no dropout for the test set
m_name = 'bw_model_1'
save_path = os.path.join(os.getcwd(), 'bw_models')
if not os.path.exists(save_path):
    os.mkdir(save_path)

model_path = os.path.join(save_path, m_name)

#save a graph of the loss over time
py.offline.plot(data, filename=model_path)



#end

对不起我的愚蠢!哈哈

发现一个大错误,问题是我创建了协调器但没有在队列运行器中调用它。是这样的:

# Start the data queue
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess)

应该是这样的:

# Start the data queue
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess, coord)

现在代码正在停止线程并在最后收集它!