Google PubSub - 计算主题中的消息

Google PubSub - Counting messages in topic

我查看了 Google 的 PubSub 文档,还尝试查看 Google Cloud Monitoring,但找不到任何方法来确定队列大小我的主题。

由于我计划使用 PubSub 进行分析,因此监控队列计数对我来说很重要,因此我可以扩展 up/down 订阅者数量。

我错过了什么?

您要查看的指标是 "undelivered messages." 您应该能够在 Google Cloud Monitoring under the "Pub/Sub Subscription" resource type. The number of messages that have not yet been acknowledged by subscribers, i.e., queue size, is a per-subscription metric as opposed to a per-topic metric. For info on the metric, see pubsub.googleapis.com/subscription/num_undelivered_messages in the GCP Metrics List 中设置警报或图表来监控此指标(以及所有 Pub/Sub 的其他指标)指标可用)。

您问题的答案是 "no",PubSub 没有显示这些计数的功能。你必须这样做的方法是使用 Stackdriver 通过日志事件监控(我也花了一些时间才找到它)。

对此的通俗回答是按以下步骤进行:

  1. 从 GCloud 管理控制台导航至:Monitoring

  1. 这会打开一个新的 window 和单独的 Stackdriver 控制台
  2. 在 Stackdriver 中导航:Dashboards > Create Dashboard

  1. 单击仪表板屏幕右上角的 Add Chart 按钮

  1. 在输入框中输入num_undelivered_messages然后输入SAVE

如果您正在寻找实现此目标的编程方式,这可能会有所帮助:

from google.cloud import monitoring_v3
from google.cloud.monitoring_v3 import query

project = "my-project"
client = monitoring_v3.MetricServiceClient()
result = query.Query(
         client,
         project,
         'pubsub.googleapis.com/subscription/num_undelivered_messages', 
         minutes=60).as_dataframe()

print(result['pubsub_subscription'][project]['subscription_name'][0])

基于@steeve 的更新版本。 (没有 pandas 依赖)

请注意,您必须指定 end_time 而不是使用默认值 utcnow()

import datetime
from google.cloud import monitoring_v3
from google.cloud.monitoring_v3 import query

project = 'my-project'
sub_name = 'my-sub'
client = monitoring_v3.MetricServiceClient()
result = query.Query(
  client,
  project,
  'pubsub.googleapis.com/subscription/num_undelivered_messages',
  end_time=datetime.datetime.now(),
  minutes=1,
  ).select_resources(subscription_id=sub_name)

for content in result:
  print(content.points[0].value.int64_value)

有一种方法可以使用自定义指标对发布到主题的所有消息进行计数。

在我的例子中,我通过运行 python 脚本的 Cloud Composer (Airflow) Dag 将消息发布到 Pub/Sub 主题。

python 脚本 returns 记录有关 运行 Dag 的信息。

logging.info(
f"Total events in file {counter-1}, total successfully published {counter - error_counter -1}, total errors publishing {error_counter}. Events sent to topic: {TOPIC_PATH} from filename: {source_blob_name}.",
{
"metric": "<some_name>",
"type": "completed_file",
"topic": EVENT_TOPIC,
"filename": source_blob_name,
"total_events_in_file": counter - 1,
"failed_published_messages": error_counter,
"successful_published_messages": counter - error_counter - 1,
}

然后我有一个 分布 自定义指标过滤 resource_type, resource_lablejsonPayload.metricjsonPayload.type。该指标还将 字段名称 设置为 jsonPayload.successful_published_messages

自定义指标过滤器:

resource.type=cloud_composer_environment AND resource.labels.environment_name={env_name} AND jsonPayload.metric=<some_name> AND jsonPayload.type=completed_file

然后在 MQL 设置为

的仪表板中使用该自定义指标
fetch cloud_composer_environment
| metric
'logging.googleapis.com/user/my_custom_metric'
| group_by 1d, [value_pubsub_aggregate: aggregate(value.pubsub)]
| every 1d
| group_by [],
[value_pubsub_aggregate_sum: sum(value_pubsub_aggregate)]

我首先使用 资源类型设置一个图标图表: 云作曲家环境,指标: my_custom度量,处​​理步骤:到无预处理步骤,对齐函数: SUM,周期 1,unit天,你想怎么分组group by function: mean.

理想情况下,您只需 select 按函数对分组求和,但它会出错,这就是为什么您需要 sqitch 到 MQL 并手动输入求和而不是均值。

这将计算您发布的消息长达 24 个月,这是 Google for the custom metrics 设置的保留期。