如何使用气流中的本地 bq 运算符实现 python 任务

how to achieve python task using local bq operator from airflow

在处理项目时遇到了一种情况,我们想使用气流执行一些任务,但不允许我们使用 python 运算符,而是指示使用本地 BigQuery 运算符。任何人都可以帮助我设置气流变量或如何编写将由 BQ 操作员执行的条件代码等代码。可能吗 ? 如果不是,那么我的下一个问题是,是否有可能使用 BQ 运算符从 BQ table 获取结果并将其分配给一个 python 变量,所以想同时使用 BQ 和 python 运算符,是有什么办法吗?

因此,下面是一个示例,说明如何使用 BigQuery 运算符并使用交叉通信将数据发送到另一个任务xcom_pull

您可以使用 BigQueryGetDataOperatorBigQueryOperator 通过自定义查询来查询数据。这些操作员将为您 return 一个列表,因此您可以在另一个任务中获取它。我在示例中的 bash 运算符中使用了它:

from airflow import models
from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.bigquery import (
    BigQueryGetDataOperator,
)
from airflow.utils.dates import days_ago

PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "<project-name>")
BQ_LOCATION = "europe-north1"
TABLE_NAME="<table-name>"
DATASET_NAME = os.environ.get("GCP_BIGQUERY_DATASET_NAME", "<ds-name>")


with models.DAG(
    "example_bigquery_operations",
    schedule_interval='@once',  # Override to match your needs
    start_date=days_ago(1),
    tags=["example"],
) as dag:

    get_data = BigQueryGetDataOperator(
        task_id="get_data",
        dataset_id=DATASET_NAME,
        table_id=TABLE_NAME,
        max_results=1,
        selected_fields="name",
        #location=BQ_LOCATION,
    )

    
    get_dataset_result = BashOperator(
        task_id="get_dataset_result",
        bash_command="echo \"{{ task_instance.xcom_pull('get_data') }}\"",
    )
    
    get_data >> get_dataset_result
[2021-12-06 17:12:07,641] {logging_mixin.py:109} INFO - Running <TaskInstance: example_bigquery_operations.get_dataset_result 2021-12-05T00:00:00+00:00 [running]> on host airflow-worker-c92mz
[2021-12-06 17:12:07,937] {taskinstance.py:1254} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=example_bigquery_operations
AIRFLOW_CTX_TASK_ID=get_dataset_result
AIRFLOW_CTX_EXECUTION_DATE=2021-12-05T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2021-12-05T00:00:00+00:00
[2021-12-06 17:12:07,939] {subprocess.py:52} INFO - Tmp dir root location: 
 /tmp
[2021-12-06 17:12:07,939] {subprocess.py:63} INFO - Running command: ['bash', '-c', 'echo "[[\'Tom\']]"']
[2021-12-06 17:12:08,242] {subprocess.py:74} INFO - Output:
[2021-12-06 17:12:08,245] {subprocess.py:78} INFO - [['Tom']]
[2021-12-06 17:12:08,246] {subprocess.py:82} INFO - Command exited with return code 0