从 Pubsub 在 BigQuery 中编写查询
Write query in BigQuery from Pubsub
需要一些帮助。
我正在接收包含 PubSub 主题中数据的消息,我需要插入从消息中获取的数据,并使用后台云函数在 BigQuery 中进行查询(PUB/SUB)...
我设法做到的:
/**
* Triggered from a message on a Cloud Pub/Sub topic.
*
* @param {!Object} event Event payload.
* @param {!Object} context Metadata for the event.
*/
exports.insertBigQuery = (message, context) => {
extractQuery(message.data);
};
function extractQuery(pubSubMessage){
// Decide base64 the PubSub message
let logData = Buffer.from(pubSubMessage, 'base64').toString();
// Convert it in JSON
let logMessage= JSON.parse(logData);
console.log(logMessage.customerToken)
console.log(logMessage.fbclid)
console.log(logMessage.fbc)
console.log(logMessage.fbp)
console.log(logMessage.firstHitTS)
console.log(logMessage.consentFB)
main();
return logMessage
}
"use strict";
function main() {
const { BigQuery } = require("@google-cloud/bigquery");
const bigquery = new BigQuery();
async function query() {
const query = `INSERT INTO MYTABLE( customerToken, fbclid, fbc, fbp, firstHitTS, consentFB)
VALUES ("customerTokenSCRIPTCLOUD","fbclidSCRIPT"," fbcSCRIPTCLOUD"," fbpSCRIPTCLOUD","2021-01-05",TRUE )`;
const options = {
query: query,
location: "US",
};
const [job] = await bigquery.createQueryJob(options);
console.log(`Job ${job.id} started.`);
const [rows] = await job.getQueryResults();
console.log("Rows:");
rows.forEach((row) => console.log(row));
}
query();
}
现在每次收到消息时,我都会在 bigQuery 中查询,但我的 VALUES 是硬编码的,正如您在此处看到的那样:
const query = `INSERT INTO devsensetestprojects.TestDataSet.fbSimpleData( customerToken, fbclid, fbc, fbp, firstHitTS, consentFB)
VALUES ("customerTokenSCRIPTCLOUD","fbclidSCRIPT"," fbcSCRIPTCLOUD"," fbpSCRIPTCLOUD","2021-01-05",TRUE )`;
我无法做的是从 function extractQuery(pubSubMessage)
获取值并在我的查询中使用它们,就像我在函数 (logMessage.SOMEVALUE) 中使用的一样,以获得正确的值我需要
提前致谢!
如你所说,你是开发新手。这里有一个更简洁高效的代码。我没有测试它,但它更接近你想要的。让我知道有些部分对你来说很神秘!
// Make them global to load them only when the Cloud Function instance is created
// They will be reused in the subsequent processing and until the instance deletion
const { BigQuery } = require("@google-cloud/bigquery");
const bigquery = new BigQuery();
exports.insertBigQuery = async (message, context) => {
// Decode base64 the PubSub message
let logData = Buffer.from(message.data, 'base64').toString();
// Convert it in JSON
let logMessage= JSON.parse(logData);
const query = createQuery(logMessage)
const options = {
query: query,
location: "US",
};
const [job] = await bigquery.createQueryJob(options);
console.log(`Job ${job.id} started.`);
// Only wait the end of the job. Theere is no row as answer, it's only an insert
await job.getQueryResults();
}
function createQuery(logMessage) {
// You maybe have to format correctly the logMessage.firstHitTS to be accepted by BigQuery as a date.
return `INSERT INTO MYTABLE(customerToken, fbclid, fbc, fbp, firstHitTS, consentFB)
VALUES (logMessage.customerToken, logMessage.fbclid, logMessage.fbc, logMessage.fbp,
logMessage.firstHitTS, logMessage.consentFB)`;
}
需要一些帮助。
我正在接收包含 PubSub 主题中数据的消息,我需要插入从消息中获取的数据,并使用后台云函数在 BigQuery 中进行查询(PUB/SUB)...
我设法做到的:
/**
* Triggered from a message on a Cloud Pub/Sub topic.
*
* @param {!Object} event Event payload.
* @param {!Object} context Metadata for the event.
*/
exports.insertBigQuery = (message, context) => {
extractQuery(message.data);
};
function extractQuery(pubSubMessage){
// Decide base64 the PubSub message
let logData = Buffer.from(pubSubMessage, 'base64').toString();
// Convert it in JSON
let logMessage= JSON.parse(logData);
console.log(logMessage.customerToken)
console.log(logMessage.fbclid)
console.log(logMessage.fbc)
console.log(logMessage.fbp)
console.log(logMessage.firstHitTS)
console.log(logMessage.consentFB)
main();
return logMessage
}
"use strict";
function main() {
const { BigQuery } = require("@google-cloud/bigquery");
const bigquery = new BigQuery();
async function query() {
const query = `INSERT INTO MYTABLE( customerToken, fbclid, fbc, fbp, firstHitTS, consentFB)
VALUES ("customerTokenSCRIPTCLOUD","fbclidSCRIPT"," fbcSCRIPTCLOUD"," fbpSCRIPTCLOUD","2021-01-05",TRUE )`;
const options = {
query: query,
location: "US",
};
const [job] = await bigquery.createQueryJob(options);
console.log(`Job ${job.id} started.`);
const [rows] = await job.getQueryResults();
console.log("Rows:");
rows.forEach((row) => console.log(row));
}
query();
}
现在每次收到消息时,我都会在 bigQuery 中查询,但我的 VALUES 是硬编码的,正如您在此处看到的那样:
const query = `INSERT INTO devsensetestprojects.TestDataSet.fbSimpleData( customerToken, fbclid, fbc, fbp, firstHitTS, consentFB)
VALUES ("customerTokenSCRIPTCLOUD","fbclidSCRIPT"," fbcSCRIPTCLOUD"," fbpSCRIPTCLOUD","2021-01-05",TRUE )`;
我无法做的是从 function extractQuery(pubSubMessage)
获取值并在我的查询中使用它们,就像我在函数 (logMessage.SOMEVALUE) 中使用的一样,以获得正确的值我需要
提前致谢!
如你所说,你是开发新手。这里有一个更简洁高效的代码。我没有测试它,但它更接近你想要的。让我知道有些部分对你来说很神秘!
// Make them global to load them only when the Cloud Function instance is created
// They will be reused in the subsequent processing and until the instance deletion
const { BigQuery } = require("@google-cloud/bigquery");
const bigquery = new BigQuery();
exports.insertBigQuery = async (message, context) => {
// Decode base64 the PubSub message
let logData = Buffer.from(message.data, 'base64').toString();
// Convert it in JSON
let logMessage= JSON.parse(logData);
const query = createQuery(logMessage)
const options = {
query: query,
location: "US",
};
const [job] = await bigquery.createQueryJob(options);
console.log(`Job ${job.id} started.`);
// Only wait the end of the job. Theere is no row as answer, it's only an insert
await job.getQueryResults();
}
function createQuery(logMessage) {
// You maybe have to format correctly the logMessage.firstHitTS to be accepted by BigQuery as a date.
return `INSERT INTO MYTABLE(customerToken, fbclid, fbc, fbp, firstHitTS, consentFB)
VALUES (logMessage.customerToken, logMessage.fbclid, logMessage.fbc, logMessage.fbp,
logMessage.firstHitTS, logMessage.consentFB)`;
}