如何使用 python 实时监控 Bigquery 中特定 table 的 Stackdriver 日志?
How to monitor Stackdriver logs for a specific table in Bigquery in real time using python?
我正在使用 BI 工具查看 Bigquery 中的数据。数据分布在多个 table 中,因此我在查询中有多个 JOINS 以获取我需要的格式的数据。由于处理所有这些数据需要一分钟时间,而且我想要一种实时体验,因此我创建了一个计划查询 运行 该查询确实加入并将输出保存到 table . table 中的数据每 30 分钟从 ETL 工具提供一次(增量加载)。这会创建 Bigquery Job 来加载数据。
我有一个特定的 table,在完成此 table 的工作后,我想 运行 预定的查询。
我在计划查询中禁用了计划并使其仅在 API 调用期间可以 运行。我写了一个 python 脚本,它向预定查询发送 API 请求。
python 中是否有任何方法可以实时监控特定 Bigquery table 的日志,因此当特定 table 的作业状态更改为 'Succeeded' 时,我将向 运行 Scheduled query?
发送 API scheduled query 请求
我看到了Stackdriver logging python code,看来我要反复发出API请求来模拟实时监控。我似乎无法针对特定 table 过滤 API 请求中的结果,我编写了一些脚本来从日志结果中为我执行此操作。
有没有原生的库?
如果您提到的过滤器是正确的,这应该有效:
from google.cloud import logging
from google.cloud.logging import DESCENDING
filter = 'resource.type="bigquery_resource" AND protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.load.destinationTable.tableId="tableID" AND log_name="projects/projectID/logs/cloudaudit.googleapis.com%2Fdata_access" AND proto_payload.method_name="jobservice.jobcompleted'
for element in logging_client.list_entries(order_by=DESCENDING, filter_=filter):
.... YOUR LOGIC HERE ...
如果您需要实时解决方案(一些等待日志到达的代码),您应该自己实施。上面的代码将带来所有与您的过滤器相关的日志,并按日期降序排列。如果您只想列出最后的日志,您还应该更改您的过滤器,添加一个时间戳过滤器。
如果您有任何问题,请随时问我。
希望对你有帮助
找到了解决方案,但它使用了其他 Google 服务。
Cloud Logging
有一个名为 Sink
的功能,我们可以将日志定向到 Cloud Pub/Sub
主题。
我们可以从 Cloud Pub/Sub
.
调用 Cloud Functions
Cloud Functions
将使用 python 代码向 Scheduled query
发送 API 请求。
我正在使用 BI 工具查看 Bigquery 中的数据。数据分布在多个 table 中,因此我在查询中有多个 JOINS 以获取我需要的格式的数据。由于处理所有这些数据需要一分钟时间,而且我想要一种实时体验,因此我创建了一个计划查询 运行 该查询确实加入并将输出保存到 table . table 中的数据每 30 分钟从 ETL 工具提供一次(增量加载)。这会创建 Bigquery Job 来加载数据。
我有一个特定的 table,在完成此 table 的工作后,我想 运行 预定的查询。
我在计划查询中禁用了计划并使其仅在 API 调用期间可以 运行。我写了一个 python 脚本,它向预定查询发送 API 请求。
python 中是否有任何方法可以实时监控特定 Bigquery table 的日志,因此当特定 table 的作业状态更改为 'Succeeded' 时,我将向 运行 Scheduled query?
发送 API scheduled query 请求我看到了Stackdriver logging python code,看来我要反复发出API请求来模拟实时监控。我似乎无法针对特定 table 过滤 API 请求中的结果,我编写了一些脚本来从日志结果中为我执行此操作。
有没有原生的库?
如果您提到的过滤器是正确的,这应该有效:
from google.cloud import logging
from google.cloud.logging import DESCENDING
filter = 'resource.type="bigquery_resource" AND protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.load.destinationTable.tableId="tableID" AND log_name="projects/projectID/logs/cloudaudit.googleapis.com%2Fdata_access" AND proto_payload.method_name="jobservice.jobcompleted'
for element in logging_client.list_entries(order_by=DESCENDING, filter_=filter):
.... YOUR LOGIC HERE ...
如果您需要实时解决方案(一些等待日志到达的代码),您应该自己实施。上面的代码将带来所有与您的过滤器相关的日志,并按日期降序排列。如果您只想列出最后的日志,您还应该更改您的过滤器,添加一个时间戳过滤器。
如果您有任何问题,请随时问我。 希望对你有帮助
找到了解决方案,但它使用了其他 Google 服务。
Cloud Logging
有一个名为 Sink
的功能,我们可以将日志定向到 Cloud Pub/Sub
主题。
我们可以从 Cloud Pub/Sub
.
Cloud Functions
Cloud Functions
将使用 python 代码向 Scheduled query
发送 API 请求。