如何使用 GCP Pub/Sub 从订阅中提取消息?
How to pull messages from a subscription with GCP Pub/Sub?
我对 GCP 感到困惑 Pub/Sub REST API。
背景: 我正在尝试使用 GCP Pub/Sub 编写一个应用程序,其中该语言不会作为客户端库退出(我正在尝试使用 R) .
因此,我需要依赖 REST API 提供的:https://cloud.google.com/pubsub/docs/reference/rest
根据我对 REST 的理解 API,我们将不得不使用请求订阅:https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions/pull
问题:我的困惑是这是一个POST请求
POST https://pubsub.googleapis.com/v1/{subscription}:pull
此 POST 请求有响应:
{
"receivedMessages": [
{
object (ReceivedMessage)
}
]
}
我怎样才能收到来自 POST 请求的响应?这对我来说没有意义。
我的目标是订阅 Pub/Sub 消息订阅,类似于 Python 图书馆 here:
To subscribe to data in Cloud Pub/Sub, you create a subscription based
on the topic, and subscribe to that, passing a callback function.
import os
from google.cloud import pubsub_v1
topic_name = 'projects/{project_id}/topics/{topic}'.format(
project_id=os.getenv('GOOGLE_CLOUD_PROJECT'),
topic='MY_TOPIC_NAME', # Set this to something appropriate.
)
subscription_name = 'projects/{project_id}/subscriptions/{sub}'.format(
project_id=os.getenv('GOOGLE_CLOUD_PROJECT'),
sub='MY_SUBSCRIPTION_NAME', # Set this to something appropriate.
)
def callback(message):
print(message.data)
message.ack()
with pubsub_v1.SubscriberClient() as subscriber:
subscriber.create_subscription(
name=subscription_name, topic=topic_name)
future = subscriber.subscribe(subscription_name, callback)
try:
future.result()
except KeyboardInterrupt:
future.cancel()
云 Pub/Sub 客户端库使用 streaming pull 接收消息而不是拉取。它还将用户从这些实际请求中抽象出来。客户端库本身接收消息列表,然后调用用户提供的回调。
如果您直接使用拉取请求,那么您需要遍历响应中的每条消息并调用每条消息的用户回调。 Cloud Pub/Sub documentation 在所有支持的语言中都有一个这样的例子,例如 Python:
from google.api_core import retry
from google.cloud import pubsub_v1
# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
NUM_MESSAGES = 3
# Wrap the subscriber in a 'with' block to automatically call close() to
# close the underlying gRPC channel when done.
with subscriber:
# The subscriber pulls a specific number of messages. The actual
# number of messages pulled may be smaller than max_messages.
response = subscriber.pull(
request={"subscription": subscription_path, "max_messages": NUM_MESSAGES},
retry=retry.Retry(deadline=300),
)
ack_ids = []
for received_message in response.received_messages:
print(f"Received: {received_message.message.data}.")
ack_ids.append(received_message.ack_id)
# Acknowledges the received messages so they will not be sent again.
subscriber.acknowledge(
request={"subscription": subscription_path, "ack_ids": ack_ids}
)
print(
f"Received and acknowledged {len(response.received_messages)} messages from {subscription_path}."
)
我对 GCP 感到困惑 Pub/Sub REST API。
背景: 我正在尝试使用 GCP Pub/Sub 编写一个应用程序,其中该语言不会作为客户端库退出(我正在尝试使用 R) .
因此,我需要依赖 REST API 提供的:https://cloud.google.com/pubsub/docs/reference/rest
根据我对 REST 的理解 API,我们将不得不使用请求订阅:https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions/pull
问题:我的困惑是这是一个POST请求
POST https://pubsub.googleapis.com/v1/{subscription}:pull
此 POST 请求有响应:
{
"receivedMessages": [
{
object (ReceivedMessage)
}
]
}
我怎样才能收到来自 POST 请求的响应?这对我来说没有意义。
我的目标是订阅 Pub/Sub 消息订阅,类似于 Python 图书馆 here:
To subscribe to data in Cloud Pub/Sub, you create a subscription based on the topic, and subscribe to that, passing a callback function.
import os
from google.cloud import pubsub_v1
topic_name = 'projects/{project_id}/topics/{topic}'.format(
project_id=os.getenv('GOOGLE_CLOUD_PROJECT'),
topic='MY_TOPIC_NAME', # Set this to something appropriate.
)
subscription_name = 'projects/{project_id}/subscriptions/{sub}'.format(
project_id=os.getenv('GOOGLE_CLOUD_PROJECT'),
sub='MY_SUBSCRIPTION_NAME', # Set this to something appropriate.
)
def callback(message):
print(message.data)
message.ack()
with pubsub_v1.SubscriberClient() as subscriber:
subscriber.create_subscription(
name=subscription_name, topic=topic_name)
future = subscriber.subscribe(subscription_name, callback)
try:
future.result()
except KeyboardInterrupt:
future.cancel()
云 Pub/Sub 客户端库使用 streaming pull 接收消息而不是拉取。它还将用户从这些实际请求中抽象出来。客户端库本身接收消息列表,然后调用用户提供的回调。
如果您直接使用拉取请求,那么您需要遍历响应中的每条消息并调用每条消息的用户回调。 Cloud Pub/Sub documentation 在所有支持的语言中都有一个这样的例子,例如 Python:
from google.api_core import retry
from google.cloud import pubsub_v1
# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
NUM_MESSAGES = 3
# Wrap the subscriber in a 'with' block to automatically call close() to
# close the underlying gRPC channel when done.
with subscriber:
# The subscriber pulls a specific number of messages. The actual
# number of messages pulled may be smaller than max_messages.
response = subscriber.pull(
request={"subscription": subscription_path, "max_messages": NUM_MESSAGES},
retry=retry.Retry(deadline=300),
)
ack_ids = []
for received_message in response.received_messages:
print(f"Received: {received_message.message.data}.")
ack_ids.append(received_message.ack_id)
# Acknowledges the received messages so they will not be sent again.
subscriber.acknowledge(
request={"subscription": subscription_path, "ack_ids": ack_ids}
)
print(
f"Received and acknowledged {len(response.received_messages)} messages from {subscription_path}."
)