如何查看 google pub/sub 何时完成

How to see when google pub/sub completes

从客户那里,我有以下代码:

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)
future = publisher.publish(topic_path, data=json.dumps(dict(op='create_master', review_id=1273612)))

有没有办法检查项目何时完成处理?如果是这样,那将如何完成?现在,我不知道是否有人'works'。

要知道消息已经发布成功,还需要看future的结果。首选方法是异步执行此操作:

def callback(future):
  try:
    print(future.result()) # future.result() is the message ID for the published message.
  except Exception as e:
    print("Error publishing: " + str(e))

future = publisher.publish(topic_path, data=json.dumps(dict(op='create_master', review_id=1273612)))
future.add_done_callback(callback)

如果需要,您也可以同步执行此操作。在发布结果可用之前调用 result() 将阻塞:

future = publisher.publish(topic_path, data=json.dumps(dict(op='create_master', review_id=1273612)))
try:
  print(future.result()) # future.result() is the message ID for the published message.
except Exception as e:
  print("Error publishing: " + str(e))

没有内置方法可以知道订阅者何时完成消息处理。要求发布者知道订阅者何时处理消息是一种反模式;发布者和订阅者旨在分隔彼此不直接了解的实体。话虽这么说,如果您需要此类信息,最好的方法是设置第二个主题,您的原始订阅者在完成处理后发布消息,您的原始发布者可以订阅以了解处理时间完成了。

设置它的一种方法是将其存储在基于 message_id 的数据库中。例如,这里是一些示例服务器代码:

def callback(message):

    # Message has been received by the Server/Subscriber
    cursor.execute('INSERT IGNORE INTO pubsub (id, message, received) VALUES (%s, %s, NOW())', (message.message_id, message.data))
    connection.commit()

    # Message is processed by the Server/Subscriber
    data_obj = loads(message.data)
    _process(data_obj)

    # Message has finished being processed by the Server/Subscriber
    cursor.execute('UPDATE pubsub SET completed=NOW() WHERE id=%s', (message.message_id,))
    connection.commit()
    message.ack()

客户端可以通过 future.result() 访问 id,因此可以轻松查询以查看状态。如果在单独的进程中查看状态(例如,如果 100 个 long-运行 进程 运行 并且我们想要跟踪哪些进程已完成),这将特别有用。