如何使用气流中的本地 bq 运算符实现 python 任务
how to achieve python task using local bq operator from airflow
在处理项目时遇到了一种情况,我们想使用气流执行一些任务,但不允许我们使用 python 运算符,而是指示使用本地 BigQuery 运算符。任何人都可以帮助我设置气流变量或如何编写将由 BQ 操作员执行的条件代码等代码。可能吗 ?
如果不是,那么我的下一个问题是,是否有可能使用 BQ 运算符从 BQ table 获取结果并将其分配给一个 python 变量,所以想同时使用 BQ 和 python 运算符,是有什么办法吗?
因此,下面是一个示例,说明如何使用 BigQuery 运算符并使用交叉通信将数据发送到另一个任务xcom_pull
。
您可以使用 BigQueryGetDataOperator
或 BigQueryOperator
通过自定义查询来查询数据。这些操作员将为您 return 一个列表,因此您可以在另一个任务中获取它。我在示例中的 bash 运算符中使用了它:
from airflow import models
from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryGetDataOperator,
)
from airflow.utils.dates import days_ago
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "<project-name>")
BQ_LOCATION = "europe-north1"
TABLE_NAME="<table-name>"
DATASET_NAME = os.environ.get("GCP_BIGQUERY_DATASET_NAME", "<ds-name>")
with models.DAG(
"example_bigquery_operations",
schedule_interval='@once', # Override to match your needs
start_date=days_ago(1),
tags=["example"],
) as dag:
get_data = BigQueryGetDataOperator(
task_id="get_data",
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
max_results=1,
selected_fields="name",
#location=BQ_LOCATION,
)
get_dataset_result = BashOperator(
task_id="get_dataset_result",
bash_command="echo \"{{ task_instance.xcom_pull('get_data') }}\"",
)
get_data >> get_dataset_result
[2021-12-06 17:12:07,641] {logging_mixin.py:109} INFO - Running <TaskInstance: example_bigquery_operations.get_dataset_result 2021-12-05T00:00:00+00:00 [running]> on host airflow-worker-c92mz
[2021-12-06 17:12:07,937] {taskinstance.py:1254} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=example_bigquery_operations
AIRFLOW_CTX_TASK_ID=get_dataset_result
AIRFLOW_CTX_EXECUTION_DATE=2021-12-05T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2021-12-05T00:00:00+00:00
[2021-12-06 17:12:07,939] {subprocess.py:52} INFO - Tmp dir root location:
/tmp
[2021-12-06 17:12:07,939] {subprocess.py:63} INFO - Running command: ['bash', '-c', 'echo "[[\'Tom\']]"']
[2021-12-06 17:12:08,242] {subprocess.py:74} INFO - Output:
[2021-12-06 17:12:08,245] {subprocess.py:78} INFO - [['Tom']]
[2021-12-06 17:12:08,246] {subprocess.py:82} INFO - Command exited with return code 0
在处理项目时遇到了一种情况,我们想使用气流执行一些任务,但不允许我们使用 python 运算符,而是指示使用本地 BigQuery 运算符。任何人都可以帮助我设置气流变量或如何编写将由 BQ 操作员执行的条件代码等代码。可能吗 ? 如果不是,那么我的下一个问题是,是否有可能使用 BQ 运算符从 BQ table 获取结果并将其分配给一个 python 变量,所以想同时使用 BQ 和 python 运算符,是有什么办法吗?
因此,下面是一个示例,说明如何使用 BigQuery 运算符并使用交叉通信将数据发送到另一个任务xcom_pull
。
您可以使用 BigQueryGetDataOperator
或 BigQueryOperator
通过自定义查询来查询数据。这些操作员将为您 return 一个列表,因此您可以在另一个任务中获取它。我在示例中的 bash 运算符中使用了它:
from airflow import models
from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryGetDataOperator,
)
from airflow.utils.dates import days_ago
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "<project-name>")
BQ_LOCATION = "europe-north1"
TABLE_NAME="<table-name>"
DATASET_NAME = os.environ.get("GCP_BIGQUERY_DATASET_NAME", "<ds-name>")
with models.DAG(
"example_bigquery_operations",
schedule_interval='@once', # Override to match your needs
start_date=days_ago(1),
tags=["example"],
) as dag:
get_data = BigQueryGetDataOperator(
task_id="get_data",
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
max_results=1,
selected_fields="name",
#location=BQ_LOCATION,
)
get_dataset_result = BashOperator(
task_id="get_dataset_result",
bash_command="echo \"{{ task_instance.xcom_pull('get_data') }}\"",
)
get_data >> get_dataset_result
[2021-12-06 17:12:07,641] {logging_mixin.py:109} INFO - Running <TaskInstance: example_bigquery_operations.get_dataset_result 2021-12-05T00:00:00+00:00 [running]> on host airflow-worker-c92mz
[2021-12-06 17:12:07,937] {taskinstance.py:1254} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=example_bigquery_operations
AIRFLOW_CTX_TASK_ID=get_dataset_result
AIRFLOW_CTX_EXECUTION_DATE=2021-12-05T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2021-12-05T00:00:00+00:00
[2021-12-06 17:12:07,939] {subprocess.py:52} INFO - Tmp dir root location:
/tmp
[2021-12-06 17:12:07,939] {subprocess.py:63} INFO - Running command: ['bash', '-c', 'echo "[[\'Tom\']]"']
[2021-12-06 17:12:08,242] {subprocess.py:74} INFO - Output:
[2021-12-06 17:12:08,245] {subprocess.py:78} INFO - [['Tom']]
[2021-12-06 17:12:08,246] {subprocess.py:82} INFO - Command exited with return code 0