我如何使用 Python 从 Google Pub/Sub 中读取足够快

How could I read fast enough from Google Pub/Sub using Python

我正在尝试从实时 public projects/pubsub-public-data/topics/taxirides-realtime 流中读取消息,看来我处理数据的速度不够快,或者有确认问题。 "Unacked message count" 不断增加我正在做的事情(即使我在 运行 我的代码之前清除消息)。我从我家 Windows 10 台 PC、基于 GCP 的 Ubuntu 虚拟机和 GCP 控制台终端尝试了 运行 相同的代码,结果相同。

附加信息:在我的一个 GCP 项目中,我为 public projects/pubsub-public-data/topics/taxirides-realtime PubSub 主题创建了一个订阅 "taxi-ride-client",我的应用程序正在阅读该主题。消息到达我的程序,但处理缓慢或不正确。

我做错了什么,还是 Python 太慢了?这是我的代码:

import os
from google.cloud import pubsub_v1

def callback(message):
    ''' Processing PubSub messages '''
    message.ack()

if __name__ == '__main__':

    project_name = '<projectname>'
    credfile = '<credfilename>.json'
    subscription_name = 'taxi-ride-client'

    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credfile

    subscriber = pubsub_v1.SubscriberClient()
    subscription = subscriber.subscription_path(project_name, subscription_name)
    subscr_future = subscriber.subscribe(subscription, callback=callback)
    print('Listening for messages via: {}'.format(subscription))

    try:
        subscr_future.result(timeout=600)   # running for 10 minutes
    except Exception as ex:
        subscr_future.cancel()

    print('\nNormal program termination.\n')

流每小时产生大约 8-10 百万条记录,其中只有不到 0.5% 符合我回调中的 IF 条件。无论如何,我还尝试了一个只包含确认行的完全空的回调。

我也 运行 这个小程序在 5 个单独的副本中从同一个订阅中读取,但即使在那种情况下我也无法改变。这表明我对确认有疑问。

我做错了什么?

顺便说一下,我使用 GC DataFlow 实现了解决方案,第一步是从 PubSub 主题中读取,并且在 Python 下运行良好。那是一个不同的库和不同的架构。但它每小时轻松处理 9 000 000 条消息。

我仍然很好奇,这应该如何使用 python 和纯 PubSub(没有 Beam)来完成。

(更新)

复制

  1. 创建的 GCP 项目名称:<your-test-project>
  2. 服务帐户文件使用 Project/Owner 角色创建,凭据文件以 JSON 格式下载
  3. 在命令 shell 中创建的订阅:gcloud pubsub subscriptions create projects/<your-test-project>/subscriptions/taxi-ride-client --topic=projects/pubsub-public-data/topics/taxirides-realtime --ack-deadline=60 --message-retention-duration=6h
  4. Python 3.7 虚拟环境 google-cloud-pubsub(版本 1.1.0)
  5. 运行替换<projectname><credfilename>后的代码。源代码here

嘉宝

由于 Python 运行时在多线程处理方面的固有局限性,云 Pub/Sub 中的高吞吐量很难实现。 Dataflow 不使用 Python 来实现其从 Pub/Sub 读取的实现,因此它不受此类限制。 Java 和 Go 往往具有更好的单机多核性能特征,因此一个选择是切换语言。或者,您将不得不水平扩展并调出更多客户端实例,以便您可以并行处理更多数据。您可能会发现 blog post on client library performance 是一本有趣的读物。

游戏后期但是:

  1. 您是否考虑过延长截止日期?您的客户端代码显示未来 10 分钟超时,但 PubSub 仍会在 1 分钟后取消它。后者尝试 600s。
  2. 多个消费者可能是一个选项,但您需要实现同步拉动与异步回调。

考虑到您在处理拉取时的延迟,这可能是更好的选择。您可以批量消费流式发布的内容(Pub-Sub 的目的)。

在实现多线程之前 - 或者如果消息处理是 CPU 绑定的多处理 - 从单个子拉开始并首先使用消息计数,然后根据需要添加 threads/processes。