如何查看 google pub/sub 何时完成
How to see when google pub/sub completes
从客户那里,我有以下代码:
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)
future = publisher.publish(topic_path, data=json.dumps(dict(op='create_master', review_id=1273612)))
有没有办法检查项目何时完成处理?如果是这样,那将如何完成?现在,我不知道是否有人'works'。
要知道消息已经发布成功,还需要看future的结果。首选方法是异步执行此操作:
def callback(future):
try:
print(future.result()) # future.result() is the message ID for the published message.
except Exception as e:
print("Error publishing: " + str(e))
future = publisher.publish(topic_path, data=json.dumps(dict(op='create_master', review_id=1273612)))
future.add_done_callback(callback)
如果需要,您也可以同步执行此操作。在发布结果可用之前调用 result()
将阻塞:
future = publisher.publish(topic_path, data=json.dumps(dict(op='create_master', review_id=1273612)))
try:
print(future.result()) # future.result() is the message ID for the published message.
except Exception as e:
print("Error publishing: " + str(e))
没有内置方法可以知道订阅者何时完成消息处理。要求发布者知道订阅者何时处理消息是一种反模式;发布者和订阅者旨在分隔彼此不直接了解的实体。话虽这么说,如果您需要此类信息,最好的方法是设置第二个主题,您的原始订阅者在完成处理后发布消息,您的原始发布者可以订阅以了解处理时间完成了。
设置它的一种方法是将其存储在基于 message_id
的数据库中。例如,这里是一些示例服务器代码:
def callback(message):
# Message has been received by the Server/Subscriber
cursor.execute('INSERT IGNORE INTO pubsub (id, message, received) VALUES (%s, %s, NOW())', (message.message_id, message.data))
connection.commit()
# Message is processed by the Server/Subscriber
data_obj = loads(message.data)
_process(data_obj)
# Message has finished being processed by the Server/Subscriber
cursor.execute('UPDATE pubsub SET completed=NOW() WHERE id=%s', (message.message_id,))
connection.commit()
message.ack()
客户端可以通过 future.result()
访问 id
,因此可以轻松查询以查看状态。如果在单独的进程中查看状态(例如,如果 100 个 long-运行 进程 运行 并且我们想要跟踪哪些进程已完成),这将特别有用。
从客户那里,我有以下代码:
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)
future = publisher.publish(topic_path, data=json.dumps(dict(op='create_master', review_id=1273612)))
有没有办法检查项目何时完成处理?如果是这样,那将如何完成?现在,我不知道是否有人'works'。
要知道消息已经发布成功,还需要看future的结果。首选方法是异步执行此操作:
def callback(future):
try:
print(future.result()) # future.result() is the message ID for the published message.
except Exception as e:
print("Error publishing: " + str(e))
future = publisher.publish(topic_path, data=json.dumps(dict(op='create_master', review_id=1273612)))
future.add_done_callback(callback)
如果需要,您也可以同步执行此操作。在发布结果可用之前调用 result()
将阻塞:
future = publisher.publish(topic_path, data=json.dumps(dict(op='create_master', review_id=1273612)))
try:
print(future.result()) # future.result() is the message ID for the published message.
except Exception as e:
print("Error publishing: " + str(e))
没有内置方法可以知道订阅者何时完成消息处理。要求发布者知道订阅者何时处理消息是一种反模式;发布者和订阅者旨在分隔彼此不直接了解的实体。话虽这么说,如果您需要此类信息,最好的方法是设置第二个主题,您的原始订阅者在完成处理后发布消息,您的原始发布者可以订阅以了解处理时间完成了。
设置它的一种方法是将其存储在基于 message_id
的数据库中。例如,这里是一些示例服务器代码:
def callback(message):
# Message has been received by the Server/Subscriber
cursor.execute('INSERT IGNORE INTO pubsub (id, message, received) VALUES (%s, %s, NOW())', (message.message_id, message.data))
connection.commit()
# Message is processed by the Server/Subscriber
data_obj = loads(message.data)
_process(data_obj)
# Message has finished being processed by the Server/Subscriber
cursor.execute('UPDATE pubsub SET completed=NOW() WHERE id=%s', (message.message_id,))
connection.commit()
message.ack()
客户端可以通过 future.result()
访问 id
,因此可以轻松查询以查看状态。如果在单独的进程中查看状态(例如,如果 100 个 long-运行 进程 运行 并且我们想要跟踪哪些进程已完成),这将特别有用。