数据流作业失败并尝试在 Bigquery 上创建 temp_dataset
Dataflow job fails and tries to create temp_dataset on Bigquery
我是 运行 一个简单的数据流作业,用于从 table 读取数据并写回另一个。
作业失败并出现错误:
Workflow failed. Causes: S01:ReadFromBQ+WriteToBigQuery/WriteToBigQuery/NativeWrite failed., BigQuery creating dataset "_dataflow_temp_dataset_18172136482196219053" in project "[my project]" failed., BigQuery execution failed., Error:
Message: Access Denied: Project [my project]: User does not have bigquery.datasets.create permission in project [my project].
不过我并没有尝试创建任何数据集,它基本上是在尝试创建一个 temp_dataset,因为作业失败了。但我没有得到任何关于幕后真正错误的信息。
阅读不是问题,真正失败的是写作步骤。我认为这与权限无关,但我的问题更多是关于如何获得真正的错误而不是这个错误。
知道如何解决这个问题吗?
代码如下:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, WorkerOptions
from sys import argv
options = PipelineOptions(flags=argv)
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = "prj"
google_cloud_options.job_name = 'test'
google_cloud_options.service_account_email = "mysa"
google_cloud_options.staging_location = 'gs://'
google_cloud_options.temp_location = 'gs://'
options.view_as(StandardOptions).runner = 'DataflowRunner'
worker_options = options.view_as(WorkerOptions)
worker_options.subnetwork = 'subnet'
with beam.Pipeline(options=options) as p:
query = "SELECT ..."
bq_source = beam.io.BigQuerySource(query=query, use_standard_sql=True)
bq_data = p | "ReadFromBQ" >> beam.io.Read(bq_source)
table_schema = ...
bq_data | beam.io.WriteToBigQuery(
project="prj",
dataset="test",
table="test",
schema=table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
使用 BigQuerySource 时,SDK 会创建一个临时数据集并将查询的输出存储到临时 table。然后它从该临时 table 发出导出以从中读取结果。
因此它创建此 temp_dataset 是预期的行为。这意味着它可能没有隐藏错误。
这没有很好的记录,但可以通过读取调用在 BigQuerySource 的实现中看到:BigQuerySource.reader() --> BigQueryReader() --> BigQueryReader().__iter__() --> BigQueryWrapper.run_query() --> BigQueryWrapper._start_query_job()。
我是 运行 一个简单的数据流作业,用于从 table 读取数据并写回另一个。 作业失败并出现错误:
Workflow failed. Causes: S01:ReadFromBQ+WriteToBigQuery/WriteToBigQuery/NativeWrite failed., BigQuery creating dataset "_dataflow_temp_dataset_18172136482196219053" in project "[my project]" failed., BigQuery execution failed., Error: Message: Access Denied: Project [my project]: User does not have bigquery.datasets.create permission in project [my project].
不过我并没有尝试创建任何数据集,它基本上是在尝试创建一个 temp_dataset,因为作业失败了。但我没有得到任何关于幕后真正错误的信息。 阅读不是问题,真正失败的是写作步骤。我认为这与权限无关,但我的问题更多是关于如何获得真正的错误而不是这个错误。 知道如何解决这个问题吗?
代码如下:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, WorkerOptions
from sys import argv
options = PipelineOptions(flags=argv)
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = "prj"
google_cloud_options.job_name = 'test'
google_cloud_options.service_account_email = "mysa"
google_cloud_options.staging_location = 'gs://'
google_cloud_options.temp_location = 'gs://'
options.view_as(StandardOptions).runner = 'DataflowRunner'
worker_options = options.view_as(WorkerOptions)
worker_options.subnetwork = 'subnet'
with beam.Pipeline(options=options) as p:
query = "SELECT ..."
bq_source = beam.io.BigQuerySource(query=query, use_standard_sql=True)
bq_data = p | "ReadFromBQ" >> beam.io.Read(bq_source)
table_schema = ...
bq_data | beam.io.WriteToBigQuery(
project="prj",
dataset="test",
table="test",
schema=table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
使用 BigQuerySource 时,SDK 会创建一个临时数据集并将查询的输出存储到临时 table。然后它从该临时 table 发出导出以从中读取结果。
因此它创建此 temp_dataset 是预期的行为。这意味着它可能没有隐藏错误。
这没有很好的记录,但可以通过读取调用在 BigQuerySource 的实现中看到:BigQuerySource.reader() --> BigQueryReader() --> BigQueryReader().__iter__() --> BigQueryWrapper.run_query() --> BigQueryWrapper._start_query_job()。