Airflow - select bigquery table 数据到数据框

Airflow - select bigquery table data into a dataframe

我试图在 google 云上的 Airflow Composer 中执行以下 DAG,但我不断收到相同的错误: conn_id hard_coded_project_name 未定义

也许有人能给我指出正确的方向?

from airflow.models import DAG 
import os

from airflow.operators.dummy import DummyOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

import datetime
import pandas as pd

from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator

from airflow.providers.google.cloud.operators import bigquery
from airflow.contrib.hooks.bigquery_hook import BigQueryHook

default_args = {
    'start_date': datetime.datetime(2020, 1, 1),
}

PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "hard_coded_project_name")

def list_dates_in_df():
            hook = BigQueryHook(bigquery_conn_id=PROJECT_ID,
                                 use_legacy_sql=False)
            bq_client = bigquery.Client(project = hook._get_field("project"),
                                         credentials = hook._get_credentials())
            query = "select count(*) from LP_RAW.DIM_ACCOUNT;"
            df = bq_client.query(query).to_dataframe()
            
 
with DAG(
        'df_test',                     
        schedule_interval=None,                 
        catchup = False,                 
        default_args=default_args
        ) as dag:
    
        
        list_dates = PythonOperator(
                task_id ='list_dates',
                python_callable = list_dates_in_df
                )

        list_dates

表示PROJECT_ID如行

PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "hard_coded_project_name")

被赋予值 hard_coded_project_name 因为 GCP_PROJECT_ID 没有值。

然后在行

hook = BigQueryHook(bigquery_conn_id=PROJECT_ID...

字符串 hard_coded_project_name 自动与 Airflow 中的连接 ID 相关联,但它没有值或不存在。


要避免此错误,您可以执行任一步骤来解决此问题。

  1. GCP_PROJECT_IDhard_coded_project_name 创建一个连接 ID,这样我们就可以确保两者都有值。但是如果我们不想为 GCP_PROJECT_ID 创建连接,请确保 hard_coded_project_name 有一个值,这样就会有一个回退选项。您可以通过
  • Opening your Airflow instance.
  • Click "Admin" > "Connections"
  • Click "Create"
  • Fill up "Conn Id", "Conn Type" as "hard_coded_project_name" and "Google Cloud Platform" respectively.
  • Fill up "Project Id" with your actual project id value
  • Do these steps another time to create GCP_PROJECT_ID
  • The connection should look like this (at minimum, providing the projectID will work. But feel free to add the keyfile or its content and scope so you won't be having problems on authentication moving forward):

  1. 您可以使用 bigquery_default 而不是 hard_coded_project_name 因此默认情况下它将指向运行 Airflow 实例的项目。 您更新后的 PROJECT_ID 分配代码将为

    PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "bigquery_default")
    

此外,在测试您的代码时,您可能会在第

行遇到错误
bq_client = bigquery.Client(project = hook._get_field("project")...

因为 Client()from airflow.providers.google.cloud.operators import bigquery 上不存在,你应该使用 from google.cloud import bigquery

这是我只创建了 hard_coded_project_name 的测试片段,所以 PROJECT_ID 将使用这个 connection.I 得到了我的 table 的计数并且它有效:

这是我在使用 bigquery_default 时所做的测试的片段,其中我得到了我的 table 的计数并且它有效: