GCP Cloud Function 未正确选取 up/acknowledging PubSub 消息

GCP Cloud Function not correctly picking up/acknowledging PubSub messages

我在 Google Cloud Platform 中设置了一些数据处理工作流程。这些位置处理物理地址和 return 一些关于它们的指标。工作流使用 Cloud Functions 和 PubSub 流的组合。

在工作流中有一个 Google Cloud Functions 时,一些消息未从触发流中提取或多次提取。我知道在某种程度上这是意料之中的。然而,这种情况经常发生。这足以导致某些位置夸大 10 倍,而其他几个位置没有结果。

我认为 callback 函数没有正确确认消息,但我不确定应该有什么不同才能更可靠地接收和确认消息。任何建议表示赞赏。

我用于检索指标的 GCP 云函数由 PubSub 流触发并执行 retrieve_location 函数将数据发送到不同的 PubSub 流。 retrieve_location 函数如下所示:

def retrieve_location(event, context):
    auth_flow()

    project_id = <my project id>
    subscription_name = <my subscription name>

    subscriber = pubsub_v1.SubscriberClient()

    subscription_path = subscriber.subscription_path(
        project_id, subscription_name)

    def callback(message):
        message.ack()
        message_obj = message.data
        message_dcde = message_obj.decode('utf-8')
        message_json = json.loads(message_dcde)

        get_metrics(message_json)


    subscriber.subscribe(subscription_path, callback=callback)

get_metrics 函数从每条消息中获取有效负载,检索一些数据并将其发送到另一个流。此功能似乎按预期工作。

def get_metrics(loc):
    <... retrieve and process data, my_data is the object that gets sent to the next stream ...>
          project_id = <my project id>
          topic_name = <my topic name>
          topic_id = <my topic id>

          publisher = pubsub_v1.PublisherClient()
          topic_path = publisher.topic_path(project_id, topic_name)

            try:
                publisher.publish(topic_path, data=my_data.encode('utf-8'))
            except Exception as exc:
                    print("topic publish failed: ", exc)

与其在您的 Cloud Function 中设置第二个 Pub/Sub 订阅者,不如创建一个订阅该主题的 background function 直接处理负载,例如:

def get_metrics_background_function(event, context):
    message_obj = event.data
    message_dcde = message_obj.decode('utf-8')
    message_json = json.loads(message_dcde)

    get_metrics(message_json)

您似乎将使用 Cloud Pub/Sub 来触发 Cloud Function 与直接通过 Cloud Pub/Sub 客户端库使用 Pub/Sub 混为一谈。通常,您会想要做其中之一。

如果您创建的订阅是通过 Cloud Functions 完成的,那么您 retrieve_location 函数并没有真正接收和处理消息。相反,它正在做的是启动一个订户客户端,并在不久之后关闭,因为 subscriber.subscribe 只会 运行 完成,因此你的函数将完成执行。

如果此函数正在启动触发 Cloud Function 的同一订阅的客户端,那么它实际上不会执行任何操作,因为 Cloud-Function-based 订阅使用 push model while the client library should be used with the pull 模型。

您要么想直接在 retrieve_location 中执行 callback 中的操作,使用事件作为消息(如 Dustin 所描述的),要么您想要设置一个持久订阅者客户端库,例如,在 GCE 上,它实例化订阅者并在其上调用 subscribe