集成 BigQuery SubPub 和 Cloud Functions
Integrate BigQuery SubPub and Cloud Functions
我在一个项目中,我们需要使用 BigQuery、PubSub、Logs explorer 和 Cloud Functions。
项目:
每次发生特定事件(如用户接受 cookie)时,系统都会向 BigQuery 中插入一个新查询,其中包含许多列(参数),例如:utm_source、utm_medium、consent_cookies, 等...
在我的 table 中有了这个新查询后,我需要读取列并获取要在云函数中使用的值。
在云函数中,我想使用这些值进行 api 调用。
到目前为止我设法做了什么:
我创建了一个日志路由接收器来过滤新条目并将日志发送到我的 PubSub 主题。
我被困在哪里:
我想创建一个云函数,每次有新日志进入时都会触发,在该函数中我想访问日志中包含的信息,例如 utm_source、utm_medium, consent_cookies, 等等...并使用值进行 api 调用。
谁能帮帮我?非常非常感谢!
我做了一个项目来说明流程:
- 插入 table:
2.From 此插入在日志记录中创建一个接收器:(过滤)
- 现在每次我创建一个新查询时它都会转到 PUB/SUB 我得到查询的日志
- 我想做的是触发一个关于这个主题的函数,并使用我在查询中的值来执行调用 api 等操作...
到目前为止,我能够编写以下代码:
"use strict";
function main() {
// Import the Google Cloud client library
const { BigQuery } = require("@google-cloud/bigquery");
async function queryDb() {
const bigqueryClient = new BigQuery();
const sqlQuery = `SELECT * FROM \`mydatatable\``;
const options = {
query: sqlQuery,
location: "europe-west3",
};
// Run the query
const [rows] = await bigqueryClient.query(options);
rows.forEach((row) => {
const username = row.user_name;
});
}
queryDb();
}
main();
现在我又被卡住了,我不知道如何从我创建的接收器中获取正确的查询并使用该信息进行调用...
您有 2 个解决方案来从 PubSub 消息调用您的 Cloud Functions
- HTTP 功能:您可以设置HTTP 调用。在 trigger-http 中创建您的 Cloud Functions,并在您的 PubSub 主题上创建推送订阅以调用 Cloud Functions。不要忘记添加安全性(将您的函数设为私有并在 PubSub 上启用安全性),因为您的函数可以公开访问
- 后台功能:您可以直接将您的 Cloud Functions 绑定到 PubSub 主题。订阅会自动创建并链接到 Cloud Functions。安全性是内置的。
而且,因为您有 2 种类型的函数,所以您有 2 个不同的函数签名。我给你们两个,处理是(完全)一样的。
function extractQuery(pubSubMessage){
// Decide base64 the PubSub message
let logData = Buffer.from(pubSubMessage, 'base64').toString();
// Convert it in JSON
let logMessage= JSON.parse(logData)
// Extract the query from the log entry
let query = logMessage.protoPayload.serviceData.jobInsertRequest.resource.jobConfiguration.query.query
console.log(query)
return query
}
// For HTTP functions
exports.bigqueryQueryInLog = (req, res) => {
console.log(req.body)
const query = extractQuery(req.body.message.data)
res.status(200).send(query);
}
// For Background functions
exports.bigqueryQueryInLogTopic = (message, context) => {
extractQuery(message.data)
};
记录的 query
是您在日志条目中的 insert into...
。然后,您必须解析您的 SQL 请求以提取您想要的部分。
我在一个项目中,我们需要使用 BigQuery、PubSub、Logs explorer 和 Cloud Functions。
项目:
每次发生特定事件(如用户接受 cookie)时,系统都会向 BigQuery 中插入一个新查询,其中包含许多列(参数),例如:utm_source、utm_medium、consent_cookies, 等...
在我的 table 中有了这个新查询后,我需要读取列并获取要在云函数中使用的值。
在云函数中,我想使用这些值进行 api 调用。
到目前为止我设法做了什么:
我创建了一个日志路由接收器来过滤新条目并将日志发送到我的 PubSub 主题。
我被困在哪里:
我想创建一个云函数,每次有新日志进入时都会触发,在该函数中我想访问日志中包含的信息,例如 utm_source、utm_medium, consent_cookies, 等等...并使用值进行 api 调用。
谁能帮帮我?非常非常感谢!
我做了一个项目来说明流程:
- 插入 table:
2.From 此插入在日志记录中创建一个接收器:(过滤)
- 现在每次我创建一个新查询时它都会转到 PUB/SUB 我得到查询的日志
- 我想做的是触发一个关于这个主题的函数,并使用我在查询中的值来执行调用 api 等操作...
到目前为止,我能够编写以下代码:
"use strict";
function main() {
// Import the Google Cloud client library
const { BigQuery } = require("@google-cloud/bigquery");
async function queryDb() {
const bigqueryClient = new BigQuery();
const sqlQuery = `SELECT * FROM \`mydatatable\``;
const options = {
query: sqlQuery,
location: "europe-west3",
};
// Run the query
const [rows] = await bigqueryClient.query(options);
rows.forEach((row) => {
const username = row.user_name;
});
}
queryDb();
}
main();
现在我又被卡住了,我不知道如何从我创建的接收器中获取正确的查询并使用该信息进行调用...
您有 2 个解决方案来从 PubSub 消息调用您的 Cloud Functions
- HTTP 功能:您可以设置HTTP 调用。在 trigger-http 中创建您的 Cloud Functions,并在您的 PubSub 主题上创建推送订阅以调用 Cloud Functions。不要忘记添加安全性(将您的函数设为私有并在 PubSub 上启用安全性),因为您的函数可以公开访问
- 后台功能:您可以直接将您的 Cloud Functions 绑定到 PubSub 主题。订阅会自动创建并链接到 Cloud Functions。安全性是内置的。
而且,因为您有 2 种类型的函数,所以您有 2 个不同的函数签名。我给你们两个,处理是(完全)一样的。
function extractQuery(pubSubMessage){
// Decide base64 the PubSub message
let logData = Buffer.from(pubSubMessage, 'base64').toString();
// Convert it in JSON
let logMessage= JSON.parse(logData)
// Extract the query from the log entry
let query = logMessage.protoPayload.serviceData.jobInsertRequest.resource.jobConfiguration.query.query
console.log(query)
return query
}
// For HTTP functions
exports.bigqueryQueryInLog = (req, res) => {
console.log(req.body)
const query = extractQuery(req.body.message.data)
res.status(200).send(query);
}
// For Background functions
exports.bigqueryQueryInLogTopic = (message, context) => {
extractQuery(message.data)
};
记录的 query
是您在日志条目中的 insert into...
。然后,您必须解析您的 SQL 请求以提取您想要的部分。