Predict/infer 与 Keras 或 TF 模型异步?

Predict/infer asynchronously with a Keras or TF model?

我一直在尝试使用队列和线程对 keras 模型进行异步 运行 预测,同时编译输入然后即时检索输出。

为此,我尝试在从队列中提取输入的生成器上使用 model.predict(),然后使用自定义 on_predict_batch_end 回调将结果尽快推送到另一个队列被计算出来。

我的方法 运行s,但不幸的是我似乎无法从回调中检索预测。我只能得到一个不让我访问实际输出的非急切张量。 tf.config.run_functions_eagerly(True) 没有帮助。

以下是我正在尝试做的事情的简要总结:

from queue import Queue
from threading import Thread


# i/o queues
input_queue = Queue(1)
output_queue = Queue(1)

# inputs generator:
def feed():
    while True:
        yield input_queue.get()

# Custom Keras callback that is supposed to pull predictions on the fly
# and add them to output queue:
class CustomCallback(tf.keras.callbacks.Callback):
    def on_predict_batch_end(self, batch, logs=None):
        # Retrieve "output" put in output queue:
        output_queue.put(self.model.layers[-1].output)

# Prediction function to use in thread call:
def prediction_function():
    model.predict(feed(),verbose=1, callbacks=[CustomCallback()])

# Start prediction thread:
pt = Thread(target=prediction_function, daemon=True)
pt.start()

# Add input to the queue and retrieve when output is ready:
for i in input_data:
    iq.put(i)
    output_data += [oq.get()]

这个 运行 很好,但是我在 output_data 列表中得到的又是一个非热切张量。因为它是非急切的 .numpy() 不起作用并且 .eval() 会破坏目的。有没有一种方法可以在不为队列中的每个新输入再次调用 model.predict() 的情况下即时访问预测,这非常慢?

解决了我的问题。如果您查看 keras 的预测方法,它会将批处理输出通过日志传递给 on_predict_batch_end()。您需要在回调中做的就是将其转发到队列:

class CustomCallback(tf.keras.callbacks.Callback):
    def on_predict_batch_end(self, batch, logs=None):
        # Retrieve "output" put in output queue:
        output_queue.put(logs['outputs'])