我如何使用 Python 从 Google Pub/Sub 中读取足够快
How could I read fast enough from Google Pub/Sub using Python
我正在尝试从实时 public projects/pubsub-public-data/topics/taxirides-realtime 流中读取消息,看来我处理数据的速度不够快,或者有确认问题。 "Unacked message count" 不断增加我正在做的事情(即使我在 运行 我的代码之前清除消息)。我从我家 Windows 10 台 PC、基于 GCP 的 Ubuntu 虚拟机和 GCP 控制台终端尝试了 运行 相同的代码,结果相同。
附加信息:在我的一个 GCP 项目中,我为 public projects/pubsub-public-data/topics/taxirides-realtime PubSub 主题创建了一个订阅 "taxi-ride-client",我的应用程序正在阅读该主题。消息到达我的程序,但处理缓慢或不正确。
我做错了什么,还是 Python 太慢了?这是我的代码:
import os
from google.cloud import pubsub_v1
def callback(message):
''' Processing PubSub messages '''
message.ack()
if __name__ == '__main__':
project_name = '<projectname>'
credfile = '<credfilename>.json'
subscription_name = 'taxi-ride-client'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credfile
subscriber = pubsub_v1.SubscriberClient()
subscription = subscriber.subscription_path(project_name, subscription_name)
subscr_future = subscriber.subscribe(subscription, callback=callback)
print('Listening for messages via: {}'.format(subscription))
try:
subscr_future.result(timeout=600) # running for 10 minutes
except Exception as ex:
subscr_future.cancel()
print('\nNormal program termination.\n')
流每小时产生大约 8-10 百万条记录,其中只有不到 0.5% 符合我回调中的 IF 条件。无论如何,我还尝试了一个只包含确认行的完全空的回调。
我也 运行 这个小程序在 5 个单独的副本中从同一个订阅中读取,但即使在那种情况下我也无法改变。这表明我对确认有疑问。
我做错了什么?
顺便说一下,我使用 GC DataFlow 实现了解决方案,第一步是从 PubSub 主题中读取,并且在 Python 下运行良好。那是一个不同的库和不同的架构。但它每小时轻松处理 9 000 000 条消息。
我仍然很好奇,这应该如何使用 python 和纯 PubSub(没有 Beam)来完成。
(更新)
复制
- 创建的 GCP 项目名称:
<your-test-project>
- 服务帐户文件使用 Project/Owner 角色创建,凭据文件以 JSON 格式下载
- 在命令 shell 中创建的订阅:
gcloud pubsub subscriptions create projects/<your-test-project>/subscriptions/taxi-ride-client --topic=projects/pubsub-public-data/topics/taxirides-realtime --ack-deadline=60 --message-retention-duration=6h
- Python 3.7 虚拟环境 google-cloud-pubsub(版本 1.1.0)
- 运行替换
<projectname>
和<credfilename>
后的代码。源代码here
嘉宝
由于 Python 运行时在多线程处理方面的固有局限性,云 Pub/Sub 中的高吞吐量很难实现。 Dataflow 不使用 Python 来实现其从 Pub/Sub 读取的实现,因此它不受此类限制。 Java 和 Go 往往具有更好的单机多核性能特征,因此一个选择是切换语言。或者,您将不得不水平扩展并调出更多客户端实例,以便您可以并行处理更多数据。您可能会发现 blog post on client library performance 是一本有趣的读物。
游戏后期但是:
- 您是否考虑过延长截止日期?您的客户端代码显示未来 10 分钟超时,但 PubSub 仍会在 1 分钟后取消它。后者尝试 600s。
- 多个消费者可能是一个选项,但您需要实现同步拉动与异步回调。
考虑到您在处理拉取时的延迟,这可能是更好的选择。您可以批量消费流式发布的内容(Pub-Sub 的目的)。
在实现多线程之前 - 或者如果消息处理是 CPU 绑定的多处理 - 从单个子拉开始并首先使用消息计数,然后根据需要添加 threads/processes。
我正在尝试从实时 public projects/pubsub-public-data/topics/taxirides-realtime 流中读取消息,看来我处理数据的速度不够快,或者有确认问题。 "Unacked message count" 不断增加我正在做的事情(即使我在 运行 我的代码之前清除消息)。我从我家 Windows 10 台 PC、基于 GCP 的 Ubuntu 虚拟机和 GCP 控制台终端尝试了 运行 相同的代码,结果相同。
附加信息:在我的一个 GCP 项目中,我为 public projects/pubsub-public-data/topics/taxirides-realtime PubSub 主题创建了一个订阅 "taxi-ride-client",我的应用程序正在阅读该主题。消息到达我的程序,但处理缓慢或不正确。
我做错了什么,还是 Python 太慢了?这是我的代码:
import os
from google.cloud import pubsub_v1
def callback(message):
''' Processing PubSub messages '''
message.ack()
if __name__ == '__main__':
project_name = '<projectname>'
credfile = '<credfilename>.json'
subscription_name = 'taxi-ride-client'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credfile
subscriber = pubsub_v1.SubscriberClient()
subscription = subscriber.subscription_path(project_name, subscription_name)
subscr_future = subscriber.subscribe(subscription, callback=callback)
print('Listening for messages via: {}'.format(subscription))
try:
subscr_future.result(timeout=600) # running for 10 minutes
except Exception as ex:
subscr_future.cancel()
print('\nNormal program termination.\n')
流每小时产生大约 8-10 百万条记录,其中只有不到 0.5% 符合我回调中的 IF 条件。无论如何,我还尝试了一个只包含确认行的完全空的回调。
我也 运行 这个小程序在 5 个单独的副本中从同一个订阅中读取,但即使在那种情况下我也无法改变。这表明我对确认有疑问。
我做错了什么?
顺便说一下,我使用 GC DataFlow 实现了解决方案,第一步是从 PubSub 主题中读取,并且在 Python 下运行良好。那是一个不同的库和不同的架构。但它每小时轻松处理 9 000 000 条消息。
我仍然很好奇,这应该如何使用 python 和纯 PubSub(没有 Beam)来完成。
(更新)
复制
- 创建的 GCP 项目名称:
<your-test-project>
- 服务帐户文件使用 Project/Owner 角色创建,凭据文件以 JSON 格式下载
- 在命令 shell 中创建的订阅:
gcloud pubsub subscriptions create projects/<your-test-project>/subscriptions/taxi-ride-client --topic=projects/pubsub-public-data/topics/taxirides-realtime --ack-deadline=60 --message-retention-duration=6h
- Python 3.7 虚拟环境 google-cloud-pubsub(版本 1.1.0)
- 运行替换
<projectname>
和<credfilename>
后的代码。源代码here
嘉宝
由于 Python 运行时在多线程处理方面的固有局限性,云 Pub/Sub 中的高吞吐量很难实现。 Dataflow 不使用 Python 来实现其从 Pub/Sub 读取的实现,因此它不受此类限制。 Java 和 Go 往往具有更好的单机多核性能特征,因此一个选择是切换语言。或者,您将不得不水平扩展并调出更多客户端实例,以便您可以并行处理更多数据。您可能会发现 blog post on client library performance 是一本有趣的读物。
游戏后期但是:
- 您是否考虑过延长截止日期?您的客户端代码显示未来 10 分钟超时,但 PubSub 仍会在 1 分钟后取消它。后者尝试 600s。
- 多个消费者可能是一个选项,但您需要实现同步拉动与异步回调。
考虑到您在处理拉取时的延迟,这可能是更好的选择。您可以批量消费流式发布的内容(Pub-Sub 的目的)。
在实现多线程之前 - 或者如果消息处理是 CPU 绑定的多处理 - 从单个子拉开始并首先使用消息计数,然后根据需要添加 threads/processes。