GCP - 从 PubSub 到 BigQuery 的消息

GCP - Message from PubSub to BigQuery

我需要从我的 pubsub 消息中获取数据并插入 bigquery。

我有:

const topicName = "-----topic-name-----";
const data = JSON.stringify({ foo: "bar" });

// Imports the Google Cloud client library
const { PubSub } = require("@google-cloud/pubsub");

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function publishMessageWithCustomAttributes() {
  // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);

  // Add two custom attributes, origin and username, to the message
  const customAttributes = {
    origin: "nodejs-sample",
    username: "gcp",
  };

  const messageId = await pubSubClient
    .topic(topicName)
    .publish(dataBuffer, customAttributes);
  console.log(`Message ${messageId} published.`);
}

publishMessageWithCustomAttributes().catch(console.error);

我需要从此消息中获取 data/attributes 并在 BigQuery 中查询,有人可以帮助我吗?

提前致谢!

事实上,有两种消费消息的解决方案:每条消息一条消息,或者批量。

首先,在详细介绍之前,由于您将执行 BigQuery 调用(或 Facebook API 调用),您将花费大量处理时间来等待 API 响应。


  • 每条消息的消息 如果您有可接受的消息量,则可以对每条消息执行一条消息处理。您在这里有 2 个解决方案:
  1. 您可以使用 Cloud Functions 处理每条消息。为函数设置最小内存量 (128Mb) 以限制 CPU 成本,从而限制全局成本。的确,因为你会等很多,不要花昂贵的CPU成本却什么都不做!好吧,你会慢慢处理数据,但这是一个权衡。

创建Cloud Function on the topic, or a Push Subscription to call a HTTP triggered Cloud Functions

  1. 您还可以处理 request concurrently with Cloud Run。 Cloud 运行 最多可以同时处理 250 个请求(预览版),而且因为您会等待很多时间,所以非常适合。如果您需要更多 CPU 和内存,可以将这些值增加到 4CPU 和 8Gb 内存。 这是我的首选解决方案。

  • 如果您能够轻松管理多cpu 多(轻)线程开发,则可以进行批量处理。在 Go 中很容易。 Node 中的并发也很容易 (await/async),但我不知道它是否支持多 cpu 或仅支持单 cpu。反正原理如下
  1. 创建 PubSub 主题的请求订阅
  2. 创建一个 Cloud 运行(更适合 multi-cpu,但也可以与 App Engine 或 Cloud Functions 一起使用),它将收听拉取订阅一段时间(比如说 10 分钟)
  3. 对于提取的每条消息,都会执行一个异步过程:获取 data/attribute、调用 BigQuery、确认消息
  4. pull连接超时后,关闭消息监听,完成当前消息处理并优雅退出(return 200 HTTP代码)
  5. 创建一个 Cloud Scheduler,每 10 分钟调用一次 Cloud 运行 服务。将超时设置为 15 分钟并放弃重试。
  6. 部署 Cloud 运行 服务,超时时间为 15 分钟。

此解决方案提供了更好的消息吞吐量处理(每个云 运行 服务可以处理超过 250 条消息),但没有真正的优势,因为您受到 API 的限制通话延迟。


编辑 1

代码示例

// For pubsunb triggered function
exports.logMessageTopic = (message, context) => {
    console.log("Message Content")
    console.log(Buffer.from(message.data, 'base64').toString())
    console.log("Attribute list")
    for (let key in message.attributes) {
        console.log(key + " -> " + message.attributes[key]);
    };
};


// For push subscription
exports.logMessagePush  = (req, res) => {
    console.log("Message Content")
    console.log(Buffer.from(req.body.message.data, 'base64').toString())
    console.log("Attribute list")
    for (let key in req.body.message.attributes) {
        console.log(key + " -> " + req.body.message.attributes[key]);
    };
};