如何更新 BigQuery 流缓冲区中的数据?
How to update data in the BigQuery stream buffer?
我的工作流程
使用云函数将数据从 Pub/Sub 流式传输到 BigQuery。
数据在流缓冲区中停留 90 分钟,因此我无法执行更新语句。
我需要在那之前更新结果列。
请帮忙。
我在“Pub/Sub”中接收数据,然后触发“云函数”,将数据插入“ BigQuery"
这是代码:
const { BigQuery } = require('@google-cloud/bigquery');
const bigquery = new BigQuery();
exports.telemetryToBigQuery = (data, context) => {
if (!data.data) {
throw new Error('No telemetry data was provided!');
return;
}
//Data comes in as base64
console.log(`raw data: ${data.data}`);
//Data gets decoded from base64 to string
const dataDataDecode = Buffer.from(data.data, 'base64').toString();
var indexesSemicolons = [];
for (var i = 0; i < dataDataDecode.length; i++) {
if (dataDataDecode[i] === ";") {
indexesSemicolons.push(i);
}
}
if (indexesSemicolons.length == 14) {
const brand = dataDataDecode.slice(0, indexesSemicolons[0]);
const model = dataDataDecode.slice(indexesSemicolons[0] + 1, indexesSemicolons[1]);
const result = dataDataDecode.slice(indexesSemicolons[1] + 1, indexesSemicolons[2]);
async function insertRowsAsStream() {
// Inserts the JSON objects into my_dataset:my_table.
const datasetId = 'put your dataset here';
const tableId = 'put table id here';
const rows = [
{
Brand: brand,
Model: model,
Result: result
}
];
// Insert data into a table
await bigquery
.dataset(datasetId)
.table(tableId)
.insert(rows);
console.log(`Inserted ${rows.length} rows`);
}
insertRowsAsStream();
} else {
console.log("Invalid message");
return;
}
}
此数据在 BigQuery 流缓冲区中保留了大约 90 分钟,但我需要执行更新查询来更改结果列。这是不允许的,会导致错误
ApiError: UPDATE or DELETE statement over table pti-tag-copy.ContainerData2.voorinfo would affect rows in the streaming buffer, which is not supported at new ApiError
我需要一种在 90 分钟缓冲时间之前更新结果的方法。你们能帮帮我吗
我在线阅读了以下页面
我阅读了以下问题的答案,我想我明白他在说什么,但我不知道如何执行它。
如果我是对的,他说的是将我的数据流式传输到临时 table,然后从那里将其放入永久 table。
Whosebug DML Update bigQuery
是的,没错。当数据流式传输时,您不能使用 DML。解决方案是查询流缓冲区中的数据并在另一个 table 中转换它们。正如你所说,它可能是暂时的,并将它们永久地沉没 table。
你也可以认为来自PubSub的流数据是原始数据,你想保留它们,然后你需要在另一个tables中细化数据。它也是一种常见的数据工程模式,对最终有用的数据(也称为数据集市)进行不同层的过滤和转换
回答你的问题。是的,它说您应该将数据流式传输到临时 table 并将其复制到另一个永久 table 并且在原始 table 中您可以启用过期时间。这意味着 table 将在过期时间后被删除。
您可以更改过滤器,使其不包含可能位于当前流缓冲区中的数据。如果您在更新数据时使用分区 table 的情况,您可以添加一个 WHERE
子句,其中时间戳的间隔为 40 到 90 分钟,例如:
WHERE Partitiontime < TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 40 MINUTE).
我现在正在使用 BigQuery 创建作业方法。
示例找到 here
我直接把数据放在table里面,这样流缓冲就不用等90分钟了
我的工作流程
使用云函数将数据从 Pub/Sub 流式传输到 BigQuery。
数据在流缓冲区中停留 90 分钟,因此我无法执行更新语句。
我需要在那之前更新结果列。
请帮忙。
我在“Pub/Sub”中接收数据,然后触发“云函数”,将数据插入“ BigQuery"
这是代码:
const { BigQuery } = require('@google-cloud/bigquery');
const bigquery = new BigQuery();
exports.telemetryToBigQuery = (data, context) => {
if (!data.data) {
throw new Error('No telemetry data was provided!');
return;
}
//Data comes in as base64
console.log(`raw data: ${data.data}`);
//Data gets decoded from base64 to string
const dataDataDecode = Buffer.from(data.data, 'base64').toString();
var indexesSemicolons = [];
for (var i = 0; i < dataDataDecode.length; i++) {
if (dataDataDecode[i] === ";") {
indexesSemicolons.push(i);
}
}
if (indexesSemicolons.length == 14) {
const brand = dataDataDecode.slice(0, indexesSemicolons[0]);
const model = dataDataDecode.slice(indexesSemicolons[0] + 1, indexesSemicolons[1]);
const result = dataDataDecode.slice(indexesSemicolons[1] + 1, indexesSemicolons[2]);
async function insertRowsAsStream() {
// Inserts the JSON objects into my_dataset:my_table.
const datasetId = 'put your dataset here';
const tableId = 'put table id here';
const rows = [
{
Brand: brand,
Model: model,
Result: result
}
];
// Insert data into a table
await bigquery
.dataset(datasetId)
.table(tableId)
.insert(rows);
console.log(`Inserted ${rows.length} rows`);
}
insertRowsAsStream();
} else {
console.log("Invalid message");
return;
}
}
此数据在 BigQuery 流缓冲区中保留了大约 90 分钟,但我需要执行更新查询来更改结果列。这是不允许的,会导致错误
ApiError: UPDATE or DELETE statement over table pti-tag-copy.ContainerData2.voorinfo would affect rows in the streaming buffer, which is not supported at new ApiError
我需要一种在 90 分钟缓冲时间之前更新结果的方法。你们能帮帮我吗
我在线阅读了以下页面
我阅读了以下问题的答案,我想我明白他在说什么,但我不知道如何执行它。
如果我是对的,他说的是将我的数据流式传输到临时 table,然后从那里将其放入永久 table。
Whosebug DML Update bigQuery
是的,没错。当数据流式传输时,您不能使用 DML。解决方案是查询流缓冲区中的数据并在另一个 table 中转换它们。正如你所说,它可能是暂时的,并将它们永久地沉没 table。
你也可以认为来自PubSub的流数据是原始数据,你想保留它们,然后你需要在另一个tables中细化数据。它也是一种常见的数据工程模式,对最终有用的数据(也称为数据集市)进行不同层的过滤和转换
回答你的问题。是的,它说您应该将数据流式传输到临时 table 并将其复制到另一个永久 table 并且在原始 table 中您可以启用过期时间。这意味着 table 将在过期时间后被删除。
您可以更改过滤器,使其不包含可能位于当前流缓冲区中的数据。如果您在更新数据时使用分区 table 的情况,您可以添加一个 WHERE
子句,其中时间戳的间隔为 40 到 90 分钟,例如:
WHERE Partitiontime < TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 40 MINUTE).
我现在正在使用 BigQuery 创建作业方法。
示例找到 here
我直接把数据放在table里面,这样流缓冲就不用等90分钟了