澳大利亚地区数据集的 Cloud Composer 问题

Cloud composer issue with datasets in Australia region

我试图使用云作曲家来安排和编排 Bigquery 作业。 Bigquery 表在 australia-southeast1 region.The cloud composer 环境是在 us-central1 区域创建的(因为 composer 在澳大利亚区域不可用)。当我尝试以下命令时,它会抛出一个模糊的错误。当我尝试使用驻留在欧盟和美国的数据集时,相同的设置工作正常。

Command:
gcloud beta composer environments run bq-schedule --location us-central1 test -- my_bigquery_dag input_gl 8-02-2018

Error:

Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 7, in <module>
    exec(compile(f.read(), __file__, 'exec'))
  File "/usr/local/lib/airflow/airflow/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/airflow/airflow/bin/cli.py", line 528, in test
    ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
  File "/usr/local/lib/airflow/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/airflow/airflow/models.py", line 1583, in run
    session=session)
  File "/usr/local/lib/airflow/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/airflow/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/airflow/airflow/contrib/operators/bigquery_operator.py", line 98, in execute
    self.create_disposition, self.query_params)
  File "/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py", line 499, in run_query
    return self.run_with_configuration(configuration)
  File "/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py", line 868, in run_with_configuration
    err.resp.status)
Exception: ('BigQuery job status check failed. Final error was: %s', 404)

Is there any workaround to resolve this issue? 

因为您的数据集位于 australia-southeast1,BigQuery 默认在同一位置创建了一个作业,即 australia-southeast1。但是,您的 Composer 环境中的 Airflow 试图在不指定位置字段的情况下获取作业的状态。

参考:https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get

我的 PR 已修复此问题,并且已合并到 master。

要解决此问题,您可以扩展 BigQueryCursor 并使用位置支持覆盖 run_with_configuration() 函数。