如何使 Firebase 数据库与 BigQuery 保持同步?
How to keep a Firebase database sync with BigQuery?
我们正在进行一个涉及大量数据的项目。现在我们最近了解到 Google BigQuery。但是我们如何将数据导出到这个平台呢?我们已经看到将日志导入 Google BigQuery 的示例。但这不包含更新和删除数据的信息(仅插入)。
因此我们的对象能够更新它们的数据。我们对 BigQuery 表的查询数量有限。我们如何在不超过 BigQuery 配额限制的情况下同步我们的数据。
我们当前的功能代码:
'use strict';
// Default imports.
const functions = require('firebase-functions');
const bigQuery = require('@google-cloud/bigquery')();
// If you want to change the nodes to listen to REMEMBER TO change the constants below.
// The 'id' field is AUTOMATICALLY added to the values, so you CANNOT add it.
const ROOT_NODE = 'categories';
const VALUES = [
'name'
];
// This function listens to the supplied root node.
// When the root node is completed empty all of the Google BigQuery rows will be removed.
// This function should only activate when the root node is deleted.
exports.root = functions.database.ref(ROOT_NODE).onWrite(event => {
if (event.data.exists()) {
return;
}
return bigQuery.query({
query: [
'DELETE FROM `stampwallet.' + ROOT_NODE + '`',
'WHERE true'
].join(' '),
params: []
});
});
// This function listens to the supplied root node, but on child added/removed/changed.
// When an object is inserted/deleted/updated the appropriate action will be taken.
exports.children = functions.database.ref(ROOT_NODE + '/{id}').onWrite(event => {
const id = event.params.id;
if (!event.data.exists()) {
return bigQuery.query({
query: [
'DELETE FROM `stampwallet.' + ROOT_NODE + '`',
'WHERE id = ?'
].join(' '),
params: [
id
]
});
}
const item = event.data.val();
if (event.data.previous.exists()) {
let update = [];
for (let index = 0; index < VALUES.length; index++) {
const value = VALUES[index];
update.push(item[value]);
}
update.push(id);
return bigQuery.query({
query: [
'UPDATE `stampwallet.' + ROOT_NODE + '`',
'SET ' + VALUES.join(' = ?, ') + ' = ?',
'WHERE id = ?'
].join(' '),
params: update
});
}
let template = [];
for (let index = 0; index < VALUES.length; index++) {
template.push('?');
}
let create = [];
create.push(id);
for (let index = 0; index < VALUES.length; index++) {
const value = VALUES[index];
create.push(item[value]);
}
return bigQuery.query({
query: [
'INSERT INTO `stampwallet.' + ROOT_NODE + '` (id, ' + VALUES.join(', ') + ')',
'VALUES (?, ' + template.join(', ') + ')'
].join(' '),
params: create
});
});
将 firebase 同步到 bigquery 的最佳方式是什么?
您在 BigQuery 中找不到更新和删除函数的原因是 BigQuery 不支持它们。 BigQuery 只有追加和截断操作。如果您想更新或删除 BigQuery 中的行,您需要删除整个数据库,然后使用修改过的行或不修改行再次写入。这不是一个好主意。
BigQuery 用于存储大量数据并可以快速访问这些数据,例如,它非常适合从不同传感器收集数据。但是对于您的客户数据库,您需要使用 MySQL 或 NoSQL 数据库。
BigQuery 支持 UPDATE 和 DELETE,但不支持频繁使用 - BigQuery 是一个分析数据库,而不是事务数据库。
要将事务数据库与 BigQuery 同步,您可以使用以下方法:
- 导出每日转储,并将其导入 BigQuery。
- 将更新和删除视为新事件,并不断将事件附加到您的 BigQuery 事件日志。
- 使用类似 https://github.com/MemedDev/mysql-to-google-bigquery 的工具。
- 类似"BigQuery at WePay part III: Automating MySQL exports every 15 minutes with Airflow, and dealing with updates"
的方法
使用 Firebase,您可以安排每天从 BigQuery 的备份中加载数据:
... way to sync firebase to bigquery?
我建议考虑 streaming
您将所有数据作为历史数据存入 BigQuery。您可以将条目标记为新(插入)、更新或删除。然后,在 BigQuery 端,您可以编写查询,根据您拥有的任何逻辑解析特定记录的最新值。
因此,您的代码几乎可以 100% 重用 - 只需修复 UPDATE
/DELETE
的逻辑,使其成为 INSERT
// When an object is inserted/deleted/updated the appropriate action will be taken.
So our objects are able to update their data. And we have a limited amount of queries on the BigQuery tables. How can we synchronize our data without exceeding the BigQuery quota limits?
是的,BigQuery 支持 UPDATE
、DELETE
、INSERT
作为 Data Manipulation Language
的一部分。
BigQuery Standard SQL 于 2017 年 3 月 8 日 announced
全面上市
在考虑使用此功能将 BigQuery 与交易数据同步之前 – 请查看 Quotas
、Pricing
和 Known Issues
。
以下是一些节选!
Quotas
(节选)
DML 语句的处理成本明显高于 SELECT
语句。
• 每个 table 每天最多 UPDATE/DELETE 个语句:96
• 每个项目每天最多 UPDATE/DELETE 个语句:1,000
Pricing
(摘录,额外突出显示 + 添加评论)
BigQuery 根据查询处理的字节数对 DML 查询收费。
处理的字节数计算如下:
UPDATE Bytes processed
= 扫描的 tables 中引用字段的字节总和 + 更新的 table[=79] 中所有字段的字节总和=] 在更新开始时。
DELETE Bytes processed
= 扫描的 tables 中引用字段的字节总和 + 修改后的 table 中所有字段的字节总和删除开始。
post 作者的评论:如您所见,即使您只更新了一行,您也会为整个 table 扫描付费!我认为这是决策的关键!
Known Issues
(节选)
• DML 语句不能用于修改 table 在其架构中包含必需字段的内容。
• 每个DML 语句都会启动一个隐式事务,这意味着语句所做的更改会在每个成功的DML 语句结束时自动提交。不支持多语句事务。
• 以下 DML 语句组合允许在 table 上同时 运行:
- 更新和插入
- 删除和插入
插入和插入
否则其中一个 DML 语句将被中止。
例如,如果两个 UPDATE 语句同时针对 table 执行,那么只有其中一个会成功。
• 最近通过 BigQuery Streaming (tabledata.insertall) 写入的表无法使用 UPDATE 或 DELETE 语句进行修改。要检查 table 是否有流缓冲区,请检查 tables.get 响应中名为 streamingBuffer 的部分。如果不存在,可以使用 UPDATE 或 DELETE 语句修改 table。
我们正在进行一个涉及大量数据的项目。现在我们最近了解到 Google BigQuery。但是我们如何将数据导出到这个平台呢?我们已经看到将日志导入 Google BigQuery 的示例。但这不包含更新和删除数据的信息(仅插入)。
因此我们的对象能够更新它们的数据。我们对 BigQuery 表的查询数量有限。我们如何在不超过 BigQuery 配额限制的情况下同步我们的数据。
我们当前的功能代码:
'use strict';
// Default imports.
const functions = require('firebase-functions');
const bigQuery = require('@google-cloud/bigquery')();
// If you want to change the nodes to listen to REMEMBER TO change the constants below.
// The 'id' field is AUTOMATICALLY added to the values, so you CANNOT add it.
const ROOT_NODE = 'categories';
const VALUES = [
'name'
];
// This function listens to the supplied root node.
// When the root node is completed empty all of the Google BigQuery rows will be removed.
// This function should only activate when the root node is deleted.
exports.root = functions.database.ref(ROOT_NODE).onWrite(event => {
if (event.data.exists()) {
return;
}
return bigQuery.query({
query: [
'DELETE FROM `stampwallet.' + ROOT_NODE + '`',
'WHERE true'
].join(' '),
params: []
});
});
// This function listens to the supplied root node, but on child added/removed/changed.
// When an object is inserted/deleted/updated the appropriate action will be taken.
exports.children = functions.database.ref(ROOT_NODE + '/{id}').onWrite(event => {
const id = event.params.id;
if (!event.data.exists()) {
return bigQuery.query({
query: [
'DELETE FROM `stampwallet.' + ROOT_NODE + '`',
'WHERE id = ?'
].join(' '),
params: [
id
]
});
}
const item = event.data.val();
if (event.data.previous.exists()) {
let update = [];
for (let index = 0; index < VALUES.length; index++) {
const value = VALUES[index];
update.push(item[value]);
}
update.push(id);
return bigQuery.query({
query: [
'UPDATE `stampwallet.' + ROOT_NODE + '`',
'SET ' + VALUES.join(' = ?, ') + ' = ?',
'WHERE id = ?'
].join(' '),
params: update
});
}
let template = [];
for (let index = 0; index < VALUES.length; index++) {
template.push('?');
}
let create = [];
create.push(id);
for (let index = 0; index < VALUES.length; index++) {
const value = VALUES[index];
create.push(item[value]);
}
return bigQuery.query({
query: [
'INSERT INTO `stampwallet.' + ROOT_NODE + '` (id, ' + VALUES.join(', ') + ')',
'VALUES (?, ' + template.join(', ') + ')'
].join(' '),
params: create
});
});
将 firebase 同步到 bigquery 的最佳方式是什么?
您在 BigQuery 中找不到更新和删除函数的原因是 BigQuery 不支持它们。 BigQuery 只有追加和截断操作。如果您想更新或删除 BigQuery 中的行,您需要删除整个数据库,然后使用修改过的行或不修改行再次写入。这不是一个好主意。
BigQuery 用于存储大量数据并可以快速访问这些数据,例如,它非常适合从不同传感器收集数据。但是对于您的客户数据库,您需要使用 MySQL 或 NoSQL 数据库。
BigQuery 支持 UPDATE 和 DELETE,但不支持频繁使用 - BigQuery 是一个分析数据库,而不是事务数据库。
要将事务数据库与 BigQuery 同步,您可以使用以下方法:
- 导出每日转储,并将其导入 BigQuery。
- 将更新和删除视为新事件,并不断将事件附加到您的 BigQuery 事件日志。
- 使用类似 https://github.com/MemedDev/mysql-to-google-bigquery 的工具。
- 类似"BigQuery at WePay part III: Automating MySQL exports every 15 minutes with Airflow, and dealing with updates" 的方法
使用 Firebase,您可以安排每天从 BigQuery 的备份中加载数据:
... way to sync firebase to bigquery?
我建议考虑 streaming
您将所有数据作为历史数据存入 BigQuery。您可以将条目标记为新(插入)、更新或删除。然后,在 BigQuery 端,您可以编写查询,根据您拥有的任何逻辑解析特定记录的最新值。
因此,您的代码几乎可以 100% 重用 - 只需修复 UPDATE
/DELETE
的逻辑,使其成为 INSERT
// When an object is inserted/deleted/updated the appropriate action will be taken.
So our objects are able to update their data. And we have a limited amount of queries on the BigQuery tables. How can we synchronize our data without exceeding the BigQuery quota limits?
是的,BigQuery 支持 UPDATE
、DELETE
、INSERT
作为 Data Manipulation Language
的一部分。
BigQuery Standard SQL 于 2017 年 3 月 8 日 announced
全面上市
在考虑使用此功能将 BigQuery 与交易数据同步之前 – 请查看 Quotas
、Pricing
和 Known Issues
。
以下是一些节选!
Quotas
(节选)
DML 语句的处理成本明显高于 SELECT
语句。
• 每个 table 每天最多 UPDATE/DELETE 个语句:96
• 每个项目每天最多 UPDATE/DELETE 个语句:1,000
Pricing
(摘录,额外突出显示 + 添加评论)
BigQuery 根据查询处理的字节数对 DML 查询收费。
处理的字节数计算如下:
UPDATE Bytes processed
= 扫描的 tables 中引用字段的字节总和 + 更新的 table[=79] 中所有字段的字节总和=] 在更新开始时。
DELETE Bytes processed
= 扫描的 tables 中引用字段的字节总和 + 修改后的 table 中所有字段的字节总和删除开始。
post 作者的评论:如您所见,即使您只更新了一行,您也会为整个 table 扫描付费!我认为这是决策的关键!
Known Issues
(节选)
• DML 语句不能用于修改 table 在其架构中包含必需字段的内容。
• 每个DML 语句都会启动一个隐式事务,这意味着语句所做的更改会在每个成功的DML 语句结束时自动提交。不支持多语句事务。
• 以下 DML 语句组合允许在 table 上同时 运行:
- 更新和插入
- 删除和插入
插入和插入
否则其中一个 DML 语句将被中止。
例如,如果两个 UPDATE 语句同时针对 table 执行,那么只有其中一个会成功。
• 最近通过 BigQuery Streaming (tabledata.insertall) 写入的表无法使用 UPDATE 或 DELETE 语句进行修改。要检查 table 是否有流缓冲区,请检查 tables.get 响应中名为 streamingBuffer 的部分。如果不存在,可以使用 UPDATE 或 DELETE 语句修改 table。