如何调用按需 bigquery 数据传输服务?
How to invoke an on-demand bigquery Data transfer service?
我非常喜欢 BigQuery 的数据传输服务。我有要加载到 BQ 中的确切模式中的平面文件。如果仅设置 DTS 计划以获取与模式匹配的 GCS 文件并将其加载到 BQ 中,那就太棒了。我喜欢在复制和电子邮件后删除源文件的内置选项,以防出现问题。但最糟糕的是最小间隔是 60 分钟。这很疯狂。也许我可以延迟 10 分钟。
因此,如果我将 DTS 设置为随需应变,我如何从 API 中调用它?我正在考虑创建一个 cronjob,每 10 分钟按需调用一次。但是我无法通过文档弄清楚如何调用它。
此外,将 GCS 文件(不需要 ETL)移动到与确切模式匹配的 bq 表中的第二可靠和最便宜的方法是什么。我应该使用 Cloud Scheduler、Cloud Functions、DataFlow、Cloud 运行 等
如果我使用 Cloud Functions,如何在调用时将我的 GCS 中的所有文件作为一个 bq 加载作业提交?
最后,有谁知道DTS以后会不会把限制降到10分钟?
因此,如果我将 DTS 设置为随需应变,我如何从 API 中调用它?我正在考虑创建一个 cronjob,每 10 分钟按需调用一次。但是我无法通过文档弄清楚如何调用它。
StartManualTransferRuns
是 RPC library but does not have a REST API equivalent as of now. How to use that will depend on your environment. For instance, you can use the Python Client Library (docs) 的一部分。
作为示例,我使用了以下代码(您需要 运行 pip install google-cloud-bigquery-datatransfer
来获取依赖项):
import time
from google.cloud import bigquery_datatransfer_v1
from google.protobuf.timestamp_pb2 import Timestamp
client = bigquery_datatransfer_v1.DataTransferServiceClient()
PROJECT_ID = 'PROJECT_ID'
TRANSFER_CONFIG_ID = '5e6...7bc' # alphanumeric ID you'll find in the UI
parent = client.project_transfer_config_path(PROJECT_ID, TRANSFER_CONFIG_ID)
start_time = bigquery_datatransfer_v1.types.Timestamp(seconds=int(time.time() + 10))
response = client.start_manual_transfer_runs(parent, requested_run_time=start_time)
print(response)
请注意,您需要使用正确的传输配置 ID,并且 requested_run_time
必须是 bigquery_datatransfer_v1.types.Timestamp
类型(文档中没有相关示例)。我设置了一个比当前执行时间提前10秒的开始时间。
您应该会收到如下回复:
runs {
name: "projects/PROJECT_NUMBER/locations/us/transferConfigs/5e6...7bc/runs/5e5...c04"
destination_dataset_id: "DATASET_NAME"
schedule_time {
seconds: 1579358571
nanos: 922599371
}
...
data_source_id: "google_cloud_storage"
state: PENDING
params {
...
}
run_time {
seconds: 1579358581
}
user_id: 28...65
}
并且传输按预期触发(不要介意错误):
此外,将 GCS 文件(无需 ETL)移动到与确切模式匹配的 bq 表中的第二个最可靠和最便宜的方法是什么。我应该使用 Cloud Scheduler、Cloud Functions、DataFlow、Cloud 运行 等吗
有了这个,您可以设置一个 cron 作业,每十分钟执行一次您的函数。正如评论中所讨论的那样,最小间隔是 60 分钟,因此它不会提取不到一小时的文件 (docs)。
除此之外,这不是一个非常可靠的解决方案,您的后续问题在这里发挥作用。我认为这些可能过于广泛,无法在单个 Whosebug 问题中解决,但我会说,对于按需刷新,Cloud Scheduler + Cloud Functions/Cloud 运行 可以很好地工作。
如果您需要 ETL,Dataflow 将是最佳选择,但它有一个可以监视文件模式的 GCS 连接器 (example)。有了这个,您将跳过传输,设置监视间隔和加载作业触发频率以将文件写入 BigQuery。与以前的方法相反,VM 将 运行 不断地在流式管道中运行,但 10 分钟的观察时间是可能的。
如果你有复杂的 workflows/dependencies,Airflow 最近引入了 operators 来启动手动 运行s。
如果我使用 Cloud Functions,我如何在调用时将我的 GCS 中的所有文件作为一个 bq 加载作业提交?
您可以在创建传输时使用 wildcards 来匹配文件模式:
此外,这可以使用 Pub/Sub notifications for Cloud Storage 触发云功能逐个文件地完成。
最后,有谁知道DTS以后会不会把限制降到10分钟?
已经有功能请求here。欢迎 star 它以表达您的兴趣并接收更新
现在您可以轻松手动 运行 使用 RESTApi 传输 Bigquery 数据:
HTTP request
POST https://bigquerydatatransfer.googleapis.com/v1/{parent=projects/*/locations/*/transferConfigs/*}:startManualRuns
- 关于这部分 > {parent=projects//locations//transferConfigs/*},检查您的 Transfer 配置,然后注意如下图所示的部分。
Here
为此,您需要知道正确的 TRANSFER_CONFIG_ID.
就我而言,我想列出所有 BigQuery 计划查询,以获得特定 ID。你可以这样做:
# Put your projetID here
PROJECT_ID = 'PROJECT_ID'
from google.cloud import bigquery_datatransfer_v1
bq_transfer_client = bigquery_datatransfer_v1.DataTransferServiceClient()
parent = bq_transfer_client.project_path(PROJECT_ID)
# Iterate over all results
for element in bq_transfer_client.list_transfer_configs(parent):
# Print Display Name for each Scheduled Query
print(f'[Schedule Query Name]:\t{element.display_name}')
# Print name of all elements (it contains the ID)
print(f'[Name]:\t\t{element.name}')
# Extract the IDs:
TRANSFER_CONFIG_ID= element.name.split('/')[-1]
print(f'[TRANSFER_CONFIG_ID]:\t\t{TRANSFER_CONFIG_ID}')
# You can print the entire element for debug purposes
print(element)
根据 Guillem 的回答和 API 更新,这是我的新代码:
import time
from google.cloud.bigquery import datatransfer_v1
from google.protobuf.timestamp_pb2 import Timestamp
client = datatransfer_v1.DataTransferServiceClient()
config = '34y....654'
PROJECT_ID = 'PROJECT_ID'
TRANSFER_CONFIG_ID = config
parent = client.transfer_config_path(PROJECT_ID, TRANSFER_CONFIG_ID)
start_time = Timestamp(seconds=int(time.time()))
request = datatransfer_v1.types.StartManualTransferRunsRequest(
{ "parent": parent, "requested_run_time": start_time }
)
response = client.start_manual_transfer_runs(request, timeout=360)
print(response)
我非常喜欢 BigQuery 的数据传输服务。我有要加载到 BQ 中的确切模式中的平面文件。如果仅设置 DTS 计划以获取与模式匹配的 GCS 文件并将其加载到 BQ 中,那就太棒了。我喜欢在复制和电子邮件后删除源文件的内置选项,以防出现问题。但最糟糕的是最小间隔是 60 分钟。这很疯狂。也许我可以延迟 10 分钟。
因此,如果我将 DTS 设置为随需应变,我如何从 API 中调用它?我正在考虑创建一个 cronjob,每 10 分钟按需调用一次。但是我无法通过文档弄清楚如何调用它。
此外,将 GCS 文件(不需要 ETL)移动到与确切模式匹配的 bq 表中的第二可靠和最便宜的方法是什么。我应该使用 Cloud Scheduler、Cloud Functions、DataFlow、Cloud 运行 等
如果我使用 Cloud Functions,如何在调用时将我的 GCS 中的所有文件作为一个 bq 加载作业提交?
最后,有谁知道DTS以后会不会把限制降到10分钟?
因此,如果我将 DTS 设置为随需应变,我如何从 API 中调用它?我正在考虑创建一个 cronjob,每 10 分钟按需调用一次。但是我无法通过文档弄清楚如何调用它。
StartManualTransferRuns
是 RPC library but does not have a REST API equivalent as of now. How to use that will depend on your environment. For instance, you can use the Python Client Library (docs) 的一部分。
作为示例,我使用了以下代码(您需要 运行 pip install google-cloud-bigquery-datatransfer
来获取依赖项):
import time
from google.cloud import bigquery_datatransfer_v1
from google.protobuf.timestamp_pb2 import Timestamp
client = bigquery_datatransfer_v1.DataTransferServiceClient()
PROJECT_ID = 'PROJECT_ID'
TRANSFER_CONFIG_ID = '5e6...7bc' # alphanumeric ID you'll find in the UI
parent = client.project_transfer_config_path(PROJECT_ID, TRANSFER_CONFIG_ID)
start_time = bigquery_datatransfer_v1.types.Timestamp(seconds=int(time.time() + 10))
response = client.start_manual_transfer_runs(parent, requested_run_time=start_time)
print(response)
请注意,您需要使用正确的传输配置 ID,并且 requested_run_time
必须是 bigquery_datatransfer_v1.types.Timestamp
类型(文档中没有相关示例)。我设置了一个比当前执行时间提前10秒的开始时间。
您应该会收到如下回复:
runs {
name: "projects/PROJECT_NUMBER/locations/us/transferConfigs/5e6...7bc/runs/5e5...c04"
destination_dataset_id: "DATASET_NAME"
schedule_time {
seconds: 1579358571
nanos: 922599371
}
...
data_source_id: "google_cloud_storage"
state: PENDING
params {
...
}
run_time {
seconds: 1579358581
}
user_id: 28...65
}
并且传输按预期触发(不要介意错误):
此外,将 GCS 文件(无需 ETL)移动到与确切模式匹配的 bq 表中的第二个最可靠和最便宜的方法是什么。我应该使用 Cloud Scheduler、Cloud Functions、DataFlow、Cloud 运行 等吗
有了这个,您可以设置一个 cron 作业,每十分钟执行一次您的函数。正如评论中所讨论的那样,最小间隔是 60 分钟,因此它不会提取不到一小时的文件 (docs)。
除此之外,这不是一个非常可靠的解决方案,您的后续问题在这里发挥作用。我认为这些可能过于广泛,无法在单个 Whosebug 问题中解决,但我会说,对于按需刷新,Cloud Scheduler + Cloud Functions/Cloud 运行 可以很好地工作。
如果您需要 ETL,Dataflow 将是最佳选择,但它有一个可以监视文件模式的 GCS 连接器 (example)。有了这个,您将跳过传输,设置监视间隔和加载作业触发频率以将文件写入 BigQuery。与以前的方法相反,VM 将 运行 不断地在流式管道中运行,但 10 分钟的观察时间是可能的。
如果你有复杂的 workflows/dependencies,Airflow 最近引入了 operators 来启动手动 运行s。
如果我使用 Cloud Functions,我如何在调用时将我的 GCS 中的所有文件作为一个 bq 加载作业提交?
您可以在创建传输时使用 wildcards 来匹配文件模式:
此外,这可以使用 Pub/Sub notifications for Cloud Storage 触发云功能逐个文件地完成。
最后,有谁知道DTS以后会不会把限制降到10分钟?
已经有功能请求here。欢迎 star 它以表达您的兴趣并接收更新
现在您可以轻松手动 运行 使用 RESTApi 传输 Bigquery 数据:
HTTP request
POST https://bigquerydatatransfer.googleapis.com/v1/{parent=projects/*/locations/*/transferConfigs/*}:startManualRuns
- 关于这部分 > {parent=projects//locations//transferConfigs/*},检查您的 Transfer 配置,然后注意如下图所示的部分。
Here
为此,您需要知道正确的 TRANSFER_CONFIG_ID.
就我而言,我想列出所有 BigQuery 计划查询,以获得特定 ID。你可以这样做:
# Put your projetID here
PROJECT_ID = 'PROJECT_ID'
from google.cloud import bigquery_datatransfer_v1
bq_transfer_client = bigquery_datatransfer_v1.DataTransferServiceClient()
parent = bq_transfer_client.project_path(PROJECT_ID)
# Iterate over all results
for element in bq_transfer_client.list_transfer_configs(parent):
# Print Display Name for each Scheduled Query
print(f'[Schedule Query Name]:\t{element.display_name}')
# Print name of all elements (it contains the ID)
print(f'[Name]:\t\t{element.name}')
# Extract the IDs:
TRANSFER_CONFIG_ID= element.name.split('/')[-1]
print(f'[TRANSFER_CONFIG_ID]:\t\t{TRANSFER_CONFIG_ID}')
# You can print the entire element for debug purposes
print(element)
根据 Guillem 的回答和 API 更新,这是我的新代码:
import time
from google.cloud.bigquery import datatransfer_v1
from google.protobuf.timestamp_pb2 import Timestamp
client = datatransfer_v1.DataTransferServiceClient()
config = '34y....654'
PROJECT_ID = 'PROJECT_ID'
TRANSFER_CONFIG_ID = config
parent = client.transfer_config_path(PROJECT_ID, TRANSFER_CONFIG_ID)
start_time = Timestamp(seconds=int(time.time()))
request = datatransfer_v1.types.StartManualTransferRunsRequest(
{ "parent": parent, "requested_run_time": start_time }
)
response = client.start_manual_transfer_runs(request, timeout=360)
print(response)