如何更新 BigQuery 流缓冲区中的数据?

How to update data in the BigQuery stream buffer?

我的工作流程

我在“Pub/Sub”中接收数据,然后触发“云函数”,将数据插入“ BigQuery"

这是代码:

const { BigQuery } = require('@google-cloud/bigquery');
const bigquery = new BigQuery();


exports.telemetryToBigQuery = (data, context) => {

  if (!data.data) {
    throw new Error('No telemetry data was provided!');
    return;
  }

  //Data comes in as base64
  console.log(`raw data: ${data.data}`);

  //Data gets decoded from base64 to string
  const dataDataDecode = Buffer.from(data.data, 'base64').toString();


var indexesSemicolons = [];

for (var i = 0; i < dataDataDecode.length; i++) {
    if (dataDataDecode[i] === ";") {
        indexesSemicolons.push(i);
    }
}

if (indexesSemicolons.length == 14) {

     const brand = dataDataDecode.slice(0, indexesSemicolons[0]);
     const model = dataDataDecode.slice(indexesSemicolons[0] + 1, indexesSemicolons[1]);
     const result = dataDataDecode.slice(indexesSemicolons[1] + 1, indexesSemicolons[2]);

    async function insertRowsAsStream() {
      // Inserts the JSON objects into my_dataset:my_table.

    
      const datasetId = 'put your dataset here';
      const tableId = 'put table id here';
      const rows = [
        {
          Brand: brand,
          Model: model,
          Result: result

        }
      ];

      // Insert data into a table
      await bigquery
        .dataset(datasetId)
        .table(tableId)
        .insert(rows);
      console.log(`Inserted ${rows.length} rows`);
    }
    insertRowsAsStream();
  } else {
    console.log("Invalid message");
    return;
  }
}

此数据在 BigQuery 流缓冲区中保留了大约 90 分钟,但我需要执行更新查询来更改结果列。这是不允许的,会导致错误

ApiError: UPDATE or DELETE statement over table pti-tag-copy.ContainerData2.voorinfo would affect rows in the streaming buffer, which is not supported at new ApiError

我需要一种在 90 分钟缓冲时间之前更新结果的方法。你们能帮帮我吗

我在线阅读了以下页面

Life of a BigQuery stream

我阅读了以下问题的答案,我想我明白他在说什么,但我不知道如何执行它。

如果我是对的,他说的是将我的数据流式传输到临时 table,然后从那里将其放入永久 table。

Whosebug DML Update bigQuery

是的,没错。当数据流式传输时,您不能使用 DML。解决方案是查询流缓冲区中的数据并在另一个 table 中转换它们。正如你所说,它可能是暂时的,并将它们永久地沉没 table。

你也可以认为来自PubSub的流数据是原始数据,你想保留它们,然后你需要在另一个tables中细化数据。它也是一种常见的数据工程模式,对最终有用的数据(也称为数据集市)进行不同层的过滤和转换

回答你的问题。是的,它说您应该将数据流式传输到临时 table 并将其复制到另一个永久 table 并且在原始 table 中您可以启用过期时间。这意味着 table 将在过期时间后被删除。

您可以更改过滤器,使其不包含可能位于当前流缓冲区中的数据。如果您在更新数据时使用分区 table 的情况,您可以添加一个 WHERE 子句,其中时间戳的间隔为 40 到 90 分钟,例如:

WHERE Partitiontime < TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 40 MINUTE).

我现在正在使用 BigQuery 创建作业方法。

示例找到 here

我直接把数据放在table里面,这样流缓冲就不用等90分钟了