GCP PubSub 长轮询

GCP PubSub long polling

GCP PubSub JS SDK 是否支持长轮询?

我希望能够同时处理多个 PubSub 消息,例如:

每 1 分钟有 1000 条消息发送到该主题。

假设在接下来的 10 秒内,将向一个主题发送 50 条消息。我希望我订阅一个 10 秒的长轮询,而不是处理每条消息。它将等待 10 秒并可能收到所有 50 条消息。

AWS JS SDK 有这个功能,我希望我也能在 GCP 上实现。

https://docs.aws.amazon.com/sdk-for-javascript/v3/developer-guide/sqs-examples-enable-long-polling.html

这是它如何在 AWS 上运行的示例:

  1. SQS 队列有超过 5 条消息。
  2. 听众 receiveMessage 将在单个 receive 中同时收到 5 条消息。事件
// Load the AWS SDK for Node.js
var AWS = require("aws-sdk");
// Set the region
AWS.config.update({ region: "REGION" });
// Set the AWS Region
const REGION = "us-east-1"; //e.g. "us-east-1"

// Set the parameters
const queueURL =
  "https://sqs.us-east-1.amazonaws.com/763335115465/long-polling-per-message"; // SQS_QUEUE_URL
const params = {
  AttributeNames: ["SentTimestamp"],
  MaxNumberOfMessages: 5,
  MessageAttributeNames: ["All"],
  QueueUrl: queueURL,
  WaitTimeSeconds: 20,
};

// Create SQS service object
const sqs = new AWS.SQS({
  region: REGION,
  credentials: {
    accessKeyId: "xx",
    secretAccessKey: "xxx",
  },
});

sqs.receiveMessage(params, function (err, data) {
  console.log({ err, data: JSON.stringify(data) });
  if (err) {
    console.log("Receive Error", err);
  } else if (data.Messages) {
    var deleteParams = {
      QueueUrl: queueURL,
      ReceiptHandle: data.Messages[0].ReceiptHandle,
    };
    sqs.deleteMessage(deleteParams, function (err, data) {
      if (err) {
        console.log("Delete Error", err);
      } else {
        console.log("Message Deleted", data);
      }
    });
  }
});


{
  "ResponseMetadata": { "RequestId": "25295507-c4ae-5106-a499-0d7808c163b8" },
  "Messages": [
    {
      "MessageId": "5dbd863e-2c50-49c8-9c4b-9f70e8db8d17",
      "ReceiptHandle": "asdf",
      "MD5OfBody": "78ef53e38c997c445f2fe1cc63c13139",
      "Body": "Test5",
      "Attributes": { "SentTimestamp": "1610991641728" }
    },
    {
      "MessageId": "09baf624-f2ee-4173-83ed-e74c0516a7e6",
      "ReceiptHandle": "asdf",
      "MD5OfBody": "c454552d52d55d3ef56408742887362b",
      "Body": "Test2",
      "Attributes": { "SentTimestamp": "1610991983369" }
    },
    {
      "MessageId": "1cac914f-d946-434a-87a0-974b14cc2eba",
      "ReceiptHandle": "asdf",
      "MD5OfBody": "b3f66ec1535de7702c38e94408fa4a17",
      "Body": "Test3",
      "Attributes": { "SentTimestamp": "1610991986299" }
    },
    {
      "MessageId": "95c2c8ad-fc7a-451a-b967-8ce1736a4cab",
      "ReceiptHandle": "asdf",
      "MD5OfBody": "f178860b5109214d9f3debe19a7800d3",
      "Body": "Test7",
      "Attributes": { "SentTimestamp": "1610991998129" }
    },
    {
      "MessageId": "3711fa29-9bbc-418d-a35f-7adbd7daa952",
      "ReceiptHandle": "asd",
      "MD5OfBody": "b6e30158b9d7d2dc8bb4f4123fe93c9b",
      "Body": "Test10",
      "Attributes": { "SentTimestamp": "1610992008975" }
    }
  ]
}

是的,你有它并且记录在案 here。设置要取消订阅的超时值。因此,如果您想再等 10 秒,请添加 10.000 毫秒!

Cloud Pub/Sub JS 库提供了一个流式的、每条消息 API 用于接收来自订阅的消息。没有办法告诉库给你一批 N 条消息,所以你必须自己实现它。