GCP Cloud Function 未正确选取 up/acknowledging PubSub 消息
GCP Cloud Function not correctly picking up/acknowledging PubSub messages
我在 Google Cloud Platform 中设置了一些数据处理工作流程。这些位置处理物理地址和 return 一些关于它们的指标。工作流使用 Cloud Functions 和 PubSub 流的组合。
在工作流中有一个 Google Cloud Functions 时,一些消息未从触发流中提取或多次提取。我知道在某种程度上这是意料之中的。然而,这种情况经常发生。这足以导致某些位置夸大 10 倍,而其他几个位置没有结果。
我认为 callback
函数没有正确确认消息,但我不确定应该有什么不同才能更可靠地接收和确认消息。任何建议表示赞赏。
我用于检索指标的 GCP 云函数由 PubSub 流触发并执行 retrieve_location
函数将数据发送到不同的 PubSub 流。 retrieve_location
函数如下所示:
def retrieve_location(event, context):
auth_flow()
project_id = <my project id>
subscription_name = <my subscription name>
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project_id, subscription_name)
def callback(message):
message.ack()
message_obj = message.data
message_dcde = message_obj.decode('utf-8')
message_json = json.loads(message_dcde)
get_metrics(message_json)
subscriber.subscribe(subscription_path, callback=callback)
get_metrics
函数从每条消息中获取有效负载,检索一些数据并将其发送到另一个流。此功能似乎按预期工作。
def get_metrics(loc):
<... retrieve and process data, my_data is the object that gets sent to the next stream ...>
project_id = <my project id>
topic_name = <my topic name>
topic_id = <my topic id>
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)
try:
publisher.publish(topic_path, data=my_data.encode('utf-8'))
except Exception as exc:
print("topic publish failed: ", exc)
与其在您的 Cloud Function 中设置第二个 Pub/Sub 订阅者,不如创建一个订阅该主题的 background function 直接处理负载,例如:
def get_metrics_background_function(event, context):
message_obj = event.data
message_dcde = message_obj.decode('utf-8')
message_json = json.loads(message_dcde)
get_metrics(message_json)
您似乎将使用 Cloud Pub/Sub 来触发 Cloud Function 与直接通过 Cloud Pub/Sub 客户端库使用 Pub/Sub 混为一谈。通常,您会想要做其中之一。
如果您创建的订阅是通过 Cloud Functions 完成的,那么您 retrieve_location
函数并没有真正接收和处理消息。相反,它正在做的是启动一个订户客户端,并在不久之后关闭,因为 subscriber.subscribe
只会 运行 完成,因此你的函数将完成执行。
如果此函数正在启动触发 Cloud Function 的同一订阅的客户端,那么它实际上不会执行任何操作,因为 Cloud-Function-based 订阅使用 push model while the client library should be used with the pull 模型。
您要么想直接在 retrieve_location
中执行 callback
中的操作,使用事件作为消息(如 Dustin 所描述的),要么您想要设置一个持久订阅者客户端库,例如,在 GCE 上,它实例化订阅者并在其上调用 subscribe
。
我在 Google Cloud Platform 中设置了一些数据处理工作流程。这些位置处理物理地址和 return 一些关于它们的指标。工作流使用 Cloud Functions 和 PubSub 流的组合。
在工作流中有一个 Google Cloud Functions 时,一些消息未从触发流中提取或多次提取。我知道在某种程度上这是意料之中的。然而,这种情况经常发生。这足以导致某些位置夸大 10 倍,而其他几个位置没有结果。
我认为 callback
函数没有正确确认消息,但我不确定应该有什么不同才能更可靠地接收和确认消息。任何建议表示赞赏。
我用于检索指标的 GCP 云函数由 PubSub 流触发并执行 retrieve_location
函数将数据发送到不同的 PubSub 流。 retrieve_location
函数如下所示:
def retrieve_location(event, context):
auth_flow()
project_id = <my project id>
subscription_name = <my subscription name>
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project_id, subscription_name)
def callback(message):
message.ack()
message_obj = message.data
message_dcde = message_obj.decode('utf-8')
message_json = json.loads(message_dcde)
get_metrics(message_json)
subscriber.subscribe(subscription_path, callback=callback)
get_metrics
函数从每条消息中获取有效负载,检索一些数据并将其发送到另一个流。此功能似乎按预期工作。
def get_metrics(loc):
<... retrieve and process data, my_data is the object that gets sent to the next stream ...>
project_id = <my project id>
topic_name = <my topic name>
topic_id = <my topic id>
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)
try:
publisher.publish(topic_path, data=my_data.encode('utf-8'))
except Exception as exc:
print("topic publish failed: ", exc)
与其在您的 Cloud Function 中设置第二个 Pub/Sub 订阅者,不如创建一个订阅该主题的 background function 直接处理负载,例如:
def get_metrics_background_function(event, context):
message_obj = event.data
message_dcde = message_obj.decode('utf-8')
message_json = json.loads(message_dcde)
get_metrics(message_json)
您似乎将使用 Cloud Pub/Sub 来触发 Cloud Function 与直接通过 Cloud Pub/Sub 客户端库使用 Pub/Sub 混为一谈。通常,您会想要做其中之一。
如果您创建的订阅是通过 Cloud Functions 完成的,那么您 retrieve_location
函数并没有真正接收和处理消息。相反,它正在做的是启动一个订户客户端,并在不久之后关闭,因为 subscriber.subscribe
只会 运行 完成,因此你的函数将完成执行。
如果此函数正在启动触发 Cloud Function 的同一订阅的客户端,那么它实际上不会执行任何操作,因为 Cloud-Function-based 订阅使用 push model while the client library should be used with the pull 模型。
您要么想直接在 retrieve_location
中执行 callback
中的操作,使用事件作为消息(如 Dustin 所描述的),要么您想要设置一个持久订阅者客户端库,例如,在 GCE 上,它实例化订阅者并在其上调用 subscribe
。