带有 Airflow 的 BigQuery - 缺少 projectId

BigQuery with Airflow - missing projectId

尝试下面的例子:

https://cloud.google.com/blog/big-data/2017/07/how-to-aggregate-data-for-bigquery-using-apache-airflow

虽然 运行 命令之一:

airflow test bigquery_github_trends_v1 bq_check_githubarchive_day 2017-06-02

出现错误:类型错误:缺少必需的参数"projectId"

错误堆栈:

[2017-09-11 16:32:26,630] {models.py:1126} INFO - Dependencies all met for <TaskInstance: bigquery_github_trends_v1.bq_check_githubarchive_day 2017-06-02 00:00:00 [None]>
[2017-09-11 16:32:26,631] {models.py:1126} INFO - Dependencies all met for <TaskInstance: bigquery_github_trends_v1.bq_check_githubarchive_day 2017-06-02 00:00:00 [None]>
[2017-09-11 16:32:26,632] {models.py:1318} INFO - 
-----------------------------------------------------------------------
---------
Starting attempt 1 of 6
-----------------------------------------------------------------------
---------

[2017-09-11 16:32:26,632] {models.py:1342} INFO - Executing <Task(BigQueryCheckOperator): bq_check_githubarchive_day> on 2017-06-02 00:00:00
[2017-09-11 16:32:26,643] {check_operator.py:75} INFO - Executing SQL check: 
#legacySql
SELECT table_id 
FROM [githubarchive:day.__TABLES__] 
WHERE table_id = "20170601"

[2017-09-11 16:32:26,646] {gcp_api_base_hook.py:73} INFO - Getting connection using `gcloud auth` user, since no key file is defined for hook.
[2017-09-11 16:32:26,671] {models.py:1417} ERROR - Missing required parameter "projectId"
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1374, in run
result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/operators/check_operator.py", line 76, in execute
records = self.get_db_hook().get_first(self.sql)
  File "/usr/local/lib/python2.7/site-packages/airflow/hooks/dbapi_hook.py", line 135, in get_first
cur.execute(sql)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/hooks/bigquery_hook.py", line 752, in execute
self.job_id = self.run_query(bql)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/hooks/bigquery_hook.py", line 244, in run_query
return self.run_with_configuration(configuration)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/hooks/bigquery_hook.py", line 498, in run_with_configuration
.insert(projectId=self.project_id, body=job_data) \
  File "/usr/local/lib/python2.7/site-packages/googleapiclient/discovery.py", line 716, in method
raise TypeError('Missing required parameter "%s"' % name)
TypeError: Missing required parameter "projectId"

如果您检查 bigquery_hook 的代码,您会发现它正在检查 project_id、https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/bigquery_hook.py#L54

默认连接是 bigquery_default 除非你覆盖它,去 Airflow UI,去 admin --> connection --> bigquery_default (或者你创建的任何东西)- -> 在此处添加项目 ID

我知道这是一个老问题,但我也在苦苦挣扎,因为对我来说 UI 没有用。 刚刚发现如何通过 CLI 执行此操作并想分享我的发现,因为它没有记录。

其实有3种方式:

  1. 通过环境变量,如所述here
export AIRFLOW_CONN_BIGQUERY_DEFAULT=google-cloud-platform://:@:?extra__google_cloud_platform__project=yourprojectid&extra__google_cloud_platform__key_path=/path/to/keyfile.json
  1. 通过 cli 和 URI
airflow connections -d --conn_id bigquery_default
airflow connections -a --conn_id bigquery_default --conn_uri 'google-cloud-platform://:@:?extra__google_cloud_platform__project=yourprojectid&extra__google_cloud_platform__key_path=/path/to/keyfile.json'
  1. 通过 cli 和参数
airflow connections -d --conn_id bigquery_default
airflow connections -a --conn_id bigquery_default --conn_type google-cloud-platform --conn_extra '{"extra__google_cloud_platform__project":"yourprojectid", "extra__google_cloud_platform__key_path":"/path/to/keyfile.json"}'

如果您省略关键路径内容,airflow 将使用 gcloud 命令行工具当前使用的任何凭据。通常是您的个人用户。

完成此操作后,您可以 运行 使用与 airflow run ...airflow test ...

的连接的任何任务