无法限制由 Pub/Sub 启动的云 运行 实例的数量

Unable to limit number of Cloud Run instances spun up by Pub/Sub

情况: 我正在尝试让 Pub/Sub 中的一条消息由恰好 1 个 Cloud 运行 实例处理。其他消息将由 Cloud 运行 的另一个实例处理。每条消息都会触发在云 运行 实例中运行大约 100 秒的繁重计算。

目前,云 运行 配置为最大并发请求数 = 1,min/max 个 0/5 实例。订阅设置为允许 600 秒确认截止日期。

问题: 每条消息似乎都在触发 Cloud 运行 的多个实例启动。我认为这是由于 CPU 的高利用率导致云 运行 启动额外的实例来帮助处理。不幸的是,这些新实例正在尝试处理完全相同的消息,从而导致意外结果。

问题: 有没有办法强制 Cloud 运行 只有 1 个实例处理一条消息,而不考虑 CPU 利用率和其他潜在因素?

相关代码片段:

import base64
import json

from fastapi import FastAPI, Request, Response

app = FastAPI()

@app.post("/")
async def handleMessage(request: Request):

  envelope = await request.json()

  # Basic data validation
  if not envelope:
      msg = "no Pub/Sub message received"
      print(f"error: {msg}")
      return Response(content=msg, status_code=400)

  if not isinstance(envelope, dict) or "message" not in envelope:
      msg = "invalid Pub/Sub message format"
      print(f"error: {msg}")
      return Response(content=msg, status_code=400)

  message = envelope["message"]

  if isinstance(message, dict) and "data" in message:
      data = json.loads(base64.b64decode(message["data"]).decode("utf-8").strip())

  try:
      # Do computationally heavy operations here
      # Will run for about 100s

      return Response(status_code=204)

  except Exception as e:
      print(e)

谢谢!

我找到问题了。

显然,Pub/Sub 保证“至少一次”传递,这意味着它可以多次向订阅者传递一条消息。因此,订阅者有责任优雅地处理这种情况(幂等性)。

在我的例子中是云 运行