无法限制由 Pub/Sub 启动的云 运行 实例的数量
Unable to limit number of Cloud Run instances spun up by Pub/Sub
情况:
我正在尝试让 Pub/Sub 中的一条消息由恰好 1 个 Cloud 运行 实例处理。其他消息将由 Cloud 运行 的另一个实例处理。每条消息都会触发在云 运行 实例中运行大约 100 秒的繁重计算。
目前,云 运行 配置为最大并发请求数 = 1,min/max 个 0/5 实例。订阅设置为允许 600 秒确认截止日期。
问题:
每条消息似乎都在触发 Cloud 运行 的多个实例启动。我认为这是由于 CPU 的高利用率导致云 运行 启动额外的实例来帮助处理。不幸的是,这些新实例正在尝试处理完全相同的消息,从而导致意外结果。
问题:
有没有办法强制 Cloud 运行 只有 1 个实例处理一条消息,而不考虑 CPU 利用率和其他潜在因素?
相关代码片段:
import base64
import json
from fastapi import FastAPI, Request, Response
app = FastAPI()
@app.post("/")
async def handleMessage(request: Request):
envelope = await request.json()
# Basic data validation
if not envelope:
msg = "no Pub/Sub message received"
print(f"error: {msg}")
return Response(content=msg, status_code=400)
if not isinstance(envelope, dict) or "message" not in envelope:
msg = "invalid Pub/Sub message format"
print(f"error: {msg}")
return Response(content=msg, status_code=400)
message = envelope["message"]
if isinstance(message, dict) and "data" in message:
data = json.loads(base64.b64decode(message["data"]).decode("utf-8").strip())
try:
# Do computationally heavy operations here
# Will run for about 100s
return Response(status_code=204)
except Exception as e:
print(e)
谢谢!
我找到问题了。
显然,Pub/Sub 保证“至少一次”传递,这意味着它可以多次向订阅者传递一条消息。因此,订阅者有责任优雅地处理这种情况(幂等性)。
在我的例子中是云 运行
情况: 我正在尝试让 Pub/Sub 中的一条消息由恰好 1 个 Cloud 运行 实例处理。其他消息将由 Cloud 运行 的另一个实例处理。每条消息都会触发在云 运行 实例中运行大约 100 秒的繁重计算。
目前,云 运行 配置为最大并发请求数 = 1,min/max 个 0/5 实例。订阅设置为允许 600 秒确认截止日期。
问题: 每条消息似乎都在触发 Cloud 运行 的多个实例启动。我认为这是由于 CPU 的高利用率导致云 运行 启动额外的实例来帮助处理。不幸的是,这些新实例正在尝试处理完全相同的消息,从而导致意外结果。
问题: 有没有办法强制 Cloud 运行 只有 1 个实例处理一条消息,而不考虑 CPU 利用率和其他潜在因素?
相关代码片段:
import base64
import json
from fastapi import FastAPI, Request, Response
app = FastAPI()
@app.post("/")
async def handleMessage(request: Request):
envelope = await request.json()
# Basic data validation
if not envelope:
msg = "no Pub/Sub message received"
print(f"error: {msg}")
return Response(content=msg, status_code=400)
if not isinstance(envelope, dict) or "message" not in envelope:
msg = "invalid Pub/Sub message format"
print(f"error: {msg}")
return Response(content=msg, status_code=400)
message = envelope["message"]
if isinstance(message, dict) and "data" in message:
data = json.loads(base64.b64decode(message["data"]).decode("utf-8").strip())
try:
# Do computationally heavy operations here
# Will run for about 100s
return Response(status_code=204)
except Exception as e:
print(e)
谢谢!
我找到问题了。
显然,Pub/Sub 保证“至少一次”传递,这意味着它可以多次向订阅者传递一条消息。因此,订阅者有责任优雅地处理这种情况(幂等性)。
在我的例子中是云 运行