Airflow - select bigquery table 数据到数据框
Airflow - select bigquery table data into a dataframe
我试图在 google 云上的 Airflow Composer 中执行以下 DAG,但我不断收到相同的错误:
conn_id hard_coded_project_name
未定义
也许有人能给我指出正确的方向?
from airflow.models import DAG
import os
from airflow.operators.dummy import DummyOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
import datetime
import pandas as pd
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.providers.google.cloud.operators import bigquery
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
default_args = {
'start_date': datetime.datetime(2020, 1, 1),
}
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "hard_coded_project_name")
def list_dates_in_df():
hook = BigQueryHook(bigquery_conn_id=PROJECT_ID,
use_legacy_sql=False)
bq_client = bigquery.Client(project = hook._get_field("project"),
credentials = hook._get_credentials())
query = "select count(*) from LP_RAW.DIM_ACCOUNT;"
df = bq_client.query(query).to_dataframe()
with DAG(
'df_test',
schedule_interval=None,
catchup = False,
default_args=default_args
) as dag:
list_dates = PythonOperator(
task_id ='list_dates',
python_callable = list_dates_in_df
)
list_dates
表示PROJECT_ID
如行
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "hard_coded_project_name")
被赋予值 hard_coded_project_name
因为 GCP_PROJECT_ID
没有值。
然后在行
hook = BigQueryHook(bigquery_conn_id=PROJECT_ID...
字符串 hard_coded_project_name
自动与 Airflow 中的连接 ID 相关联,但它没有值或不存在。
要避免此错误,您可以执行任一步骤来解决此问题。
- 为
GCP_PROJECT_ID
和 hard_coded_project_name
创建一个连接 ID,这样我们就可以确保两者都有值。但是如果我们不想为 GCP_PROJECT_ID
创建连接,请确保 hard_coded_project_name
有一个值,这样就会有一个回退选项。您可以通过
- Opening your Airflow instance.
- Click "Admin" > "Connections"
- Click "Create"
- Fill up "Conn Id", "Conn Type" as "hard_coded_project_name" and "Google Cloud Platform" respectively.
- Fill up "Project Id" with your actual project id value
- Do these steps another time to create
GCP_PROJECT_ID
- The connection should look like this (at minimum, providing the projectID will work. But feel free to add the keyfile or its content and scope so you won't be having problems on authentication moving forward):
您可以使用 bigquery_default
而不是 hard_coded_project_name
因此默认情况下它将指向运行 Airflow 实例的项目。
您更新后的 PROJECT_ID
分配代码将为
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "bigquery_default")
此外,在测试您的代码时,您可能会在第
行遇到错误
bq_client = bigquery.Client(project = hook._get_field("project")...
因为 Client()
在 from airflow.providers.google.cloud.operators import bigquery
上不存在,你应该使用 from google.cloud import bigquery
。
这是我只创建了 hard_coded_project_name
的测试片段,所以 PROJECT_ID
将使用这个 connection.I 得到了我的 table 的计数并且它有效:
这是我在使用 bigquery_default
时所做的测试的片段,其中我得到了我的 table 的计数并且它有效:
我试图在 google 云上的 Airflow Composer 中执行以下 DAG,但我不断收到相同的错误:
conn_id hard_coded_project_name
未定义
也许有人能给我指出正确的方向?
from airflow.models import DAG
import os
from airflow.operators.dummy import DummyOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
import datetime
import pandas as pd
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.providers.google.cloud.operators import bigquery
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
default_args = {
'start_date': datetime.datetime(2020, 1, 1),
}
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "hard_coded_project_name")
def list_dates_in_df():
hook = BigQueryHook(bigquery_conn_id=PROJECT_ID,
use_legacy_sql=False)
bq_client = bigquery.Client(project = hook._get_field("project"),
credentials = hook._get_credentials())
query = "select count(*) from LP_RAW.DIM_ACCOUNT;"
df = bq_client.query(query).to_dataframe()
with DAG(
'df_test',
schedule_interval=None,
catchup = False,
default_args=default_args
) as dag:
list_dates = PythonOperator(
task_id ='list_dates',
python_callable = list_dates_in_df
)
list_dates
表示PROJECT_ID
如行
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "hard_coded_project_name")
被赋予值 hard_coded_project_name
因为 GCP_PROJECT_ID
没有值。
然后在行
hook = BigQueryHook(bigquery_conn_id=PROJECT_ID...
字符串 hard_coded_project_name
自动与 Airflow 中的连接 ID 相关联,但它没有值或不存在。
要避免此错误,您可以执行任一步骤来解决此问题。
- 为
GCP_PROJECT_ID
和hard_coded_project_name
创建一个连接 ID,这样我们就可以确保两者都有值。但是如果我们不想为GCP_PROJECT_ID
创建连接,请确保hard_coded_project_name
有一个值,这样就会有一个回退选项。您可以通过
- Opening your Airflow instance.
- Click "Admin" > "Connections"
- Click "Create"
- Fill up "Conn Id", "Conn Type" as "hard_coded_project_name" and "Google Cloud Platform" respectively.
- Fill up "Project Id" with your actual project id value
- Do these steps another time to create
GCP_PROJECT_ID
- The connection should look like this (at minimum, providing the projectID will work. But feel free to add the keyfile or its content and scope so you won't be having problems on authentication moving forward):
您可以使用
bigquery_default
而不是hard_coded_project_name
因此默认情况下它将指向运行 Airflow 实例的项目。 您更新后的PROJECT_ID
分配代码将为PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "bigquery_default")
此外,在测试您的代码时,您可能会在第
行遇到错误bq_client = bigquery.Client(project = hook._get_field("project")...
因为 Client()
在 from airflow.providers.google.cloud.operators import bigquery
上不存在,你应该使用 from google.cloud import bigquery
。
这是我只创建了 hard_coded_project_name
的测试片段,所以 PROJECT_ID
将使用这个 connection.I 得到了我的 table 的计数并且它有效:
这是我在使用 bigquery_default
时所做的测试的片段,其中我得到了我的 table 的计数并且它有效: