一个应用程序中有多个 pub/sub 订阅者
Multiple pub/sub subscribers in one application
我正在尝试将两个不同的 google pub/sub 订阅者设置为不同的订阅,但使用相同的代码。为了画得更好,假设我有 topic1 和 topic2。然后我有订阅 topic1 的 subscription1 和订阅 topic2 的 subscription2。然后我有链接到 subscription1 的 subscriber1 和链接到 subscription2 的 subscriber2。我的问题是如何在同一个应用程序中使用 subscriber1 和 subscriber2。我仅针对 1 个订户的示例是(来自文档):
project_id = "my-project-id"
subscription_id = "subscription1"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
logging.info("Listening for messages on {}..\n".format(subscription_path))
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception is encountered first.
streaming_pull_future.result()
except TimeoutError:
streaming_pull_future.cancel()
如何将 subscription2 添加到其中,以便我的 python 应用程序可以从 topic1 和 topic2 获取消息?我在文档中找不到它,但如果我只是想念它,请告诉我!
如果您想同时接收来自两个订阅的消息,您可以创建两个 SubscriberClient 实例,每个订阅一个。要合并期货,您可以使用事件:
project_id = "my-project-id"
subscription_id1 = "subscription1"
subscription_id2 = "subscription2"
subscriber1 = pubsub_v1.SubscriberClient()
subscriber2 = pubsub_v1.SubscriberClient()
subscription_path1 = subscriber.subscription_path(project_id, subscription_id1)
subscription_path2 = subscriber.subscription_path(project_id, subscription_id2)
streaming_pull_future1 = subscriber1.subscribe(subscription_path1, callback=callback)
logging.info("Listening for messages on {}.".format(subscription_path1))
streaming_pull_future2 = subscriber2.subscribe(subscription_path2, callback=callback)
logging.info("Listening for messages on {}.".format(subscription_path2))
subscriber_shutdown = threading.Event()
streaming_pull_future1.add_done_callback(lambda result: subscriber_shutdown.set())
streaming_pull_future2.add_done_callback(lambda result: subscriber_shutdown.set())
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber1, subscriber2:
subscriber_shutdown.wait()
streaming_pull_future1.cancel()
streaming_pull_future2.cancel()
我正在尝试将两个不同的 google pub/sub 订阅者设置为不同的订阅,但使用相同的代码。为了画得更好,假设我有 topic1 和 topic2。然后我有订阅 topic1 的 subscription1 和订阅 topic2 的 subscription2。然后我有链接到 subscription1 的 subscriber1 和链接到 subscription2 的 subscriber2。我的问题是如何在同一个应用程序中使用 subscriber1 和 subscriber2。我仅针对 1 个订户的示例是(来自文档):
project_id = "my-project-id"
subscription_id = "subscription1"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
logging.info("Listening for messages on {}..\n".format(subscription_path))
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception is encountered first.
streaming_pull_future.result()
except TimeoutError:
streaming_pull_future.cancel()
如何将 subscription2 添加到其中,以便我的 python 应用程序可以从 topic1 和 topic2 获取消息?我在文档中找不到它,但如果我只是想念它,请告诉我!
如果您想同时接收来自两个订阅的消息,您可以创建两个 SubscriberClient 实例,每个订阅一个。要合并期货,您可以使用事件:
project_id = "my-project-id"
subscription_id1 = "subscription1"
subscription_id2 = "subscription2"
subscriber1 = pubsub_v1.SubscriberClient()
subscriber2 = pubsub_v1.SubscriberClient()
subscription_path1 = subscriber.subscription_path(project_id, subscription_id1)
subscription_path2 = subscriber.subscription_path(project_id, subscription_id2)
streaming_pull_future1 = subscriber1.subscribe(subscription_path1, callback=callback)
logging.info("Listening for messages on {}.".format(subscription_path1))
streaming_pull_future2 = subscriber2.subscribe(subscription_path2, callback=callback)
logging.info("Listening for messages on {}.".format(subscription_path2))
subscriber_shutdown = threading.Event()
streaming_pull_future1.add_done_callback(lambda result: subscriber_shutdown.set())
streaming_pull_future2.add_done_callback(lambda result: subscriber_shutdown.set())
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber1, subscriber2:
subscriber_shutdown.wait()
streaming_pull_future1.cancel()
streaming_pull_future2.cancel()