tensorflow可以在训练中动态地将文件添加到FIFOQueue中吗

Could tensorflow dynamically add file into the FIFOQueue in training

我测试过这样的代码:



    # filename_queue comes from tf.train.string_input_producer
    features, labels, filename_queue = read_batch_data(file_list, 10)
    with tf.Session() as sess:
        init = tf.initialize_all_variables()
        sess.run(init)
        coord = tf.train.Coordinator()
        threads = tf.train.start_queue_runners(sess=sess, coord=coord)
        counter = 0
        try:
            while not coord.should_stop():
                counter = counter + 1
                value = features.eval()
                if counter % 1000 == 0:
                    # check whether new data has been inserted into the queue
                    print counter, sum(value)
                    index = (counter / 1000) % 3
                    enqueue_op = filename_queue.enqueue(['a%d.csv' % index])
                    sess.run([enqueue_op])
         except tf.errors.OutOfRangeError
             ...

但是看起来图表仍然使用原始文件队列,并没有读取新数据。

我怀疑您有一个包含旧名称集的大型预取缓冲区,因此当您添加新文件名时,它只会在预取缓冲区用完后才能看到。默认情况下 tf.string_input_producer 将无限循环遍历名称集,并填满大小为 32.

的预取缓冲区

如果你想修改列表,使用 FIFOQueue 并手动填充示例比 string_input_producer. 更容易,注意不要提供足够的示例并挂起你的主线程,可能需要为您的会话设置 config.operation_timeout_in_ms=5000

例如,以下示例条目来自 /temp/pipeline/0 文件一次(文件中有 10 个条目),之后它将打印来自 /temp/pipeline/1

的条目

创建一些测试数据

def dump_numbers_to_file(fname, start_num, end_num):
  with open(fname, 'w') as f:
    for i in range(start_num, end_num):
      f.write(str(i)+"\n")
num_files=10
num_entries_per_file=10
file_root="/temp/pipeline"
os.system('mkdir -p '+file_root)
for fi in range(num_files):
  fname = file_root+"/"+str(fi)
  dump_numbers_to_file(fname, fi*num_entries_per_file, (fi+1)*num_entries_per_file)

创建会话的辅助实用程序

def create_session():
  """Resets local session, returns new InteractiveSession"""
  config = tf.ConfigProto(log_device_placement=True)
  config.gpu_options.per_process_gpu_memory_fraction=0.3 # don't hog all vRAM
  config.operation_timeout_in_ms=15000   # terminate on long hangs
  sess = tf.InteractiveSession("", config=config)
  return sess

运行 你的例子

tf.reset_default_graph()
filename_queue = tf.FIFOQueue(capacity=10, dtypes=[tf.string])
enqueue_op = filename_queue.enqueue("/temp/pipeline/0")
sess = create_session()
sess.run(enqueue_op)
sess.run(enqueue_op)
# filename queue now has [/temp/pipeline/0, /temp/pipeline/0]
reader = tf.TextLineReader()
key, value = reader.read(filename_queue)
numeric_val, = tf.decode_csv(value, record_defaults=[[-1]])
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)

for i in range(10):
  print sess.run([numeric_val])

# filename queue now has [/temp/pipeline/0]
print 'size before', sess.run(filename_queue.size())
sess.run(filename_queue.enqueue("/temp/pipeline/1"))

# filename queue now has [/temp/pipeline/0, /temp/pipeline/1]
print 'size after', sess.run(filename_queue.size())

for i in range(10):
  print sess.run([numeric_val])

# filename queue now has [/temp/pipeline/1]

for i in range(10):
  print sess.run([numeric_val])

# filename queue is now empty, next sess.run([numeric_val]) would hang

coord.request_stop()
coord.join(threads)

你应该看看

[0]
[1]
[2]
[3]
[4]
[5]
[6]
[7]
[8]
[9]
size before 1
size after 2
[0]
[1]
[2]
[3]
[4]
[5]
[6]
[7]
[8]
[9]
[10]
[11]
[12]
[13]
[14]
[15]
[16]
[17]
[18]
[19]