在 Google 云中流式传输 Insert/Update - BigQuery
Streaming Insert/Update in Google Cloud - BigQuery
我正在尝试将 Salesforce 数据流式传输到 Google Cloud Bigquery。设法实现插入流,例如:每当在 SF 中创建新的潜在客户时,都会将其插入到 Biguery 表中。检查,有没有办法我可以去 Upsert 数据。我知道有一个流式缓冲区不允许对插入的数据执行任何 DML 操作,因为这些数据将在流式缓冲区上停留很短的时间。
非常感谢有关 Upsert 部分的任何提示
已编辑 - 2019 年 6 月 6 日
使用下面的云函数插入记录
/**
* Responds to any HTTP request.
*
* @param {!express:Request} req HTTP request context.
* @param {!express:Response} res HTTP response context.
*/
exports.helloWorld = (req, res) => {
let message = req.query.mes || req.body.mes || 'Hello World!';
res.status(200).send(req.body);
var d =JSON.stringify(req.body);
console.log(d);
var e = d.replace(/:""/g, '');
var f = e.replace(/\/g, '');
var g = f.replace(/"{n /g, '');
var h = g.replace(/n}"/g, '');
var i = h.replace(/n /g, '');
console.log(i);
const {BigQuery} = require('@google-cloud/bigquery');
const bigquery = new BigQuery();
var instance = "DEMO";
var table = "HTTP";
bigquery
.dataset(instance)
.table(table)
.insert(JSON.parse(i),
{'ignoreUnknownValues':true, 'raw':false})
.then ((data) => {
console.log('Inserted 1 rows');
console.log(data);
})
};
用于更新的代码
exports.helloWorld = (req, res) => {
let message = req.query.mes || req.body.mes || 'Hello World!';
res.status(200).send(req.body);
var d =JSON.stringify(req.body);
console.log(d);
var e = d.replace(/:""/g, '');
var f = e.replace(/\/g, '');
var g = f.replace(/"{n /g, '');
var h = g.replace(/n}"/g, '');
var i = h.replace(/n /g, '');
console.log(i);
var j = JSON.parse(i);
var k = JSON.stringify(j.Id);
var id = k.replace(/"/g, '');
console.log(k);
console.log(id);
const {BigQuery} = require('@google-cloud/bigquery');
const bigquery = new BigQuery();
var instance = "DEMO";
var table = "LEADS_STG";
bigquery
.dataset(instance)
.table(table)
.insert(JSON.parse(i),
{'ignoreUnknownValues':true, 'raw':false})
.then ((data) => {
console.log('Inserted 1 rows');
console.log(data);
})
/*const bigqueryClient = new BigQuery();*/
var delayInMilliseconds = 1000;
setTimeout(function() {
bigquery.query({
query: [
'MERGE DEMO.LEADS_D T USING (SELECT ID,NAME,LEADSOURCE,COMPANY FROM DEMO.LEADS_STG where ID= ? order by LASTMODIFIEDDATE DESC LIMIT 1) S ON T.ID = S.ID WHEN MATCHED THEN UPDATE SET NAME = S.NAME, LEADSOURCE = S.LEADSOURCE, COMPANY = S.COMPANY WHEN NOT MATCHED THEN INSERT ( ID, NAME, LEADSOURCE, COMPANY) VALUES( ID, NAME,LEADSOURCE,COMPANY)'
].join(' '),
params: [
id
]
}, function(err, rows) {});
}, delayInMilliseconds);
};
我正在尝试将 Salesforce 数据流式传输到 Google Cloud Bigquery。设法实现插入流,例如:每当在 SF 中创建新的潜在客户时,都会将其插入到 Biguery 表中。检查,有没有办法我可以去 Upsert 数据。我知道有一个流式缓冲区不允许对插入的数据执行任何 DML 操作,因为这些数据将在流式缓冲区上停留很短的时间。
非常感谢有关 Upsert 部分的任何提示
已编辑 - 2019 年 6 月 6 日
使用下面的云函数插入记录
/**
* Responds to any HTTP request.
*
* @param {!express:Request} req HTTP request context.
* @param {!express:Response} res HTTP response context.
*/
exports.helloWorld = (req, res) => {
let message = req.query.mes || req.body.mes || 'Hello World!';
res.status(200).send(req.body);
var d =JSON.stringify(req.body);
console.log(d);
var e = d.replace(/:""/g, '');
var f = e.replace(/\/g, '');
var g = f.replace(/"{n /g, '');
var h = g.replace(/n}"/g, '');
var i = h.replace(/n /g, '');
console.log(i);
const {BigQuery} = require('@google-cloud/bigquery');
const bigquery = new BigQuery();
var instance = "DEMO";
var table = "HTTP";
bigquery
.dataset(instance)
.table(table)
.insert(JSON.parse(i),
{'ignoreUnknownValues':true, 'raw':false})
.then ((data) => {
console.log('Inserted 1 rows');
console.log(data);
})
};
用于更新的代码
exports.helloWorld = (req, res) => {
let message = req.query.mes || req.body.mes || 'Hello World!';
res.status(200).send(req.body);
var d =JSON.stringify(req.body);
console.log(d);
var e = d.replace(/:""/g, '');
var f = e.replace(/\/g, '');
var g = f.replace(/"{n /g, '');
var h = g.replace(/n}"/g, '');
var i = h.replace(/n /g, '');
console.log(i);
var j = JSON.parse(i);
var k = JSON.stringify(j.Id);
var id = k.replace(/"/g, '');
console.log(k);
console.log(id);
const {BigQuery} = require('@google-cloud/bigquery');
const bigquery = new BigQuery();
var instance = "DEMO";
var table = "LEADS_STG";
bigquery
.dataset(instance)
.table(table)
.insert(JSON.parse(i),
{'ignoreUnknownValues':true, 'raw':false})
.then ((data) => {
console.log('Inserted 1 rows');
console.log(data);
})
/*const bigqueryClient = new BigQuery();*/
var delayInMilliseconds = 1000;
setTimeout(function() {
bigquery.query({
query: [
'MERGE DEMO.LEADS_D T USING (SELECT ID,NAME,LEADSOURCE,COMPANY FROM DEMO.LEADS_STG where ID= ? order by LASTMODIFIEDDATE DESC LIMIT 1) S ON T.ID = S.ID WHEN MATCHED THEN UPDATE SET NAME = S.NAME, LEADSOURCE = S.LEADSOURCE, COMPANY = S.COMPANY WHEN NOT MATCHED THEN INSERT ( ID, NAME, LEADSOURCE, COMPANY) VALUES( ID, NAME,LEADSOURCE,COMPANY)'
].join(' '),
params: [
id
]
}, function(err, rows) {});
}, delayInMilliseconds);
};