使用 bigquery 操作符设置气流
setting up airflow with bigquery operator
我正在试验数据管道的气流。不幸的是,到目前为止,我无法让它与 bigquery 运算符一起使用。我已尽我所能寻找解决方案,但我仍然卡住了。我在本地使用顺序执行器 运行。
这是我的代码:
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['example@gmail.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG(
dag_id='bigQueryPipeline',
default_args=default_args,
schedule_interval=timedelta(1)
)
t1 = BigQueryOperator(
task_id='bigquery_test',
bql='SELECT COUNT(userId) FROM [events:EVENTS_20160501]',
destination_dataset_table=False,
bigquery_conn_id='bigquery_default',
delegate_to=False,
udf_config=False,
dag=dag,
)
错误信息:
[2016-08-27 00:13:14,665] {models.py:1327} ERROR - 'project'
Traceback (most recent call last):
File "/Users/jean.rodrigue/anaconda/bin/airflow", line 15, in <module>
args.func(args)
File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/bin/cli.py", line 352, in test
ti.run(force=True, ignore_dependencies=True, test_mode=True)
File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in wrapper
result = func(*args, **kwargs)
File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/models.py", line 1245, in run
result = task_copy.execute(context=context)
File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/contrib/operators/bigquery_operator.py", line 57, in execute
conn = hook.get_conn()
File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/contrib/hooks/bigquery_hook.py", line 54, in get_conn
project = connection_extras['project']
我花了一些时间才终于找到它,因为它没有被非常清楚地记录下来。在气流 UI 中,转到管理 -> 连接。该连接 ID 是参数 bigquery_connection_id 引用的内容。您必须在 "extras" 字段中添加一个 json 对象,该对象定义 "project" 的 k,v 对:""
如果您还没有在 运行 Airflow 的盒子上明确授权帐户,则还必须为 "service_account" 和 "key_path" 添加密钥。 (gcloud 身份验证)
如果您需要以编程方式执行此操作,我将其用作我们堆栈中的入口点以创建连接(如果它尚不存在):
from airflow.models import Connection
from airflow.settings import Session
session = Session()
gcp_conn = Connection(
conn_id='bigquery',
conn_type='google_cloud_platform',
extra='{"extra__google_cloud_platform__project":"<YOUR PROJECT HERE>"}')
if not session.query(Connection).filter(
Connection.conn_id == gcp_conn.conn_id).first():
session.add(gcp_conn)
session.commit()
最近我通过像这样同时指定 bigquery_conn_id
和 google_cloud_storage_conn_id
解决了一个类似的问题:
t1 = BigQueryOperator(
task_id='bigquery_test',
bql='SELECT COUNT(userId) FROM [events:EVENTS_20160501]',
destination_dataset_table=False,
bigquery_conn_id='bigquery_default', <-- Need these both
google_cloud_storage_conn_id='bigquery_default', <-- becasue of inheritance
delegate_to=False,
udf_config=False,
dag=dag,
)
在这个答案中查看更多信息:
我正在试验数据管道的气流。不幸的是,到目前为止,我无法让它与 bigquery 运算符一起使用。我已尽我所能寻找解决方案,但我仍然卡住了。我在本地使用顺序执行器 运行。
这是我的代码:
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['example@gmail.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG(
dag_id='bigQueryPipeline',
default_args=default_args,
schedule_interval=timedelta(1)
)
t1 = BigQueryOperator(
task_id='bigquery_test',
bql='SELECT COUNT(userId) FROM [events:EVENTS_20160501]',
destination_dataset_table=False,
bigquery_conn_id='bigquery_default',
delegate_to=False,
udf_config=False,
dag=dag,
)
错误信息:
[2016-08-27 00:13:14,665] {models.py:1327} ERROR - 'project'
Traceback (most recent call last):
File "/Users/jean.rodrigue/anaconda/bin/airflow", line 15, in <module>
args.func(args)
File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/bin/cli.py", line 352, in test
ti.run(force=True, ignore_dependencies=True, test_mode=True)
File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in wrapper
result = func(*args, **kwargs)
File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/models.py", line 1245, in run
result = task_copy.execute(context=context)
File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/contrib/operators/bigquery_operator.py", line 57, in execute
conn = hook.get_conn()
File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/contrib/hooks/bigquery_hook.py", line 54, in get_conn
project = connection_extras['project']
我花了一些时间才终于找到它,因为它没有被非常清楚地记录下来。在气流 UI 中,转到管理 -> 连接。该连接 ID 是参数 bigquery_connection_id 引用的内容。您必须在 "extras" 字段中添加一个 json 对象,该对象定义 "project" 的 k,v 对:""
如果您还没有在 运行 Airflow 的盒子上明确授权帐户,则还必须为 "service_account" 和 "key_path" 添加密钥。 (gcloud 身份验证)
如果您需要以编程方式执行此操作,我将其用作我们堆栈中的入口点以创建连接(如果它尚不存在):
from airflow.models import Connection
from airflow.settings import Session
session = Session()
gcp_conn = Connection(
conn_id='bigquery',
conn_type='google_cloud_platform',
extra='{"extra__google_cloud_platform__project":"<YOUR PROJECT HERE>"}')
if not session.query(Connection).filter(
Connection.conn_id == gcp_conn.conn_id).first():
session.add(gcp_conn)
session.commit()
最近我通过像这样同时指定 bigquery_conn_id
和 google_cloud_storage_conn_id
解决了一个类似的问题:
t1 = BigQueryOperator(
task_id='bigquery_test',
bql='SELECT COUNT(userId) FROM [events:EVENTS_20160501]',
destination_dataset_table=False,
bigquery_conn_id='bigquery_default', <-- Need these both
google_cloud_storage_conn_id='bigquery_default', <-- becasue of inheritance
delegate_to=False,
udf_config=False,
dag=dag,
)
在这个答案中查看更多信息: