有什么方法可以将任务的 return 值存储在 Python 变量中并与下游任务共享(不使用 xcom 或 airflow 变量)

Is there any way to store the return value of a task in Python variable and share it with downstream tasks (without using xcom or airflow variable)

我正在编写一个 airflow dag,它将从数据库中读取一堆配置,然后使用 bash 运算符执行一系列 Python 脚本。之前读取的配置将作为参数传递。

问题是我没有找到与其他下游操作员共享配置的有效方法。我设计了下面的 dag。以下是我的担忧。

  1. 我不确定将进行多少次数据库调用来获取 jinja 模板中所需的值(在下面的示例中)。

  2. 除了每个任务中的配置都相同之外,我不确定每次都从数据库中获取它是否是个好主意。 这也是我不想使用 xcom 的原因。 我使用了 airflow 变量,因为 JSON 解析可以在一行中进行。但是,我猜还是存在数据库调用问题。

class ReturningMySqlOperator(MySqlOperator):
    def execute(self, context):
        hook = MySqlHook(mysql_conn_id=self.mysql_conn_id,
                         schema=self.database)
        s = hook.get_pandas_df(sql=self.sql, parameters=self.parameters)
        s = s.set_index('laptopName', drop=False)
        print(s)
        s = s.to_json(orient='index')
        Variable.set('jobconfig', s)



t1 = ReturningMySqlOperator(
    task_id='mysql_query',
    sql='SELECT * FROM laptops',
    mysql_conn_id='mysql_db_temp',
    dag=dag)



t3 = BashOperator(
    task_id='sequence_one',
    bash_command='python3 path/sequence1.py {{var.json.jobconfig.Legion.laptopName}} {{var.json.jobconfig.Legion.company}}',
    dag=dag)

t4 = BashOperator(
    task_id='sequence_two',
    bash_command='python3 path/sequence2.py {{var.json.jobconfig.Legion.laptopName}} {{var.json.jobconfig.Legion.company}}',
    dag=dag)

t5 = BashOperator(
    task_id='sequence_three',
    bash_command='python3 path/sequence3.py {{var.json.jobconfig.Legion.laptopName}} {{var.json.jobconfig.Legion.company}}',
    dag=dag)

t6 = BashOperator(
    task_id='sequence_four',
    bash_command='python3 path/sequence4.py {{var.json.jobconfig.Legion.laptopName}} {{var.json.jobconfig.Legion.company}}',
    dag=dag)

t1 >> t3 
t3 >> [t4,t6]

第一点:

I am not sure how many DB calls will be made to fetch the values required inside the jinja templates (in the below example).

在您提供的示例中,您在每个 sequence_x 任务中建立两个到元数据数据库的连接,每个 {{var.json.jobconfig.xx}} 调用一个。好消息是那些没有被调度程序执行,所以不是每个心跳间隔都执行。来自 Astronomer guide:

Since all top-level code in DAG files is interpreted every scheduler "heartbeat," macros and templating allow run-time tasks to be offloaded to the executor instead of the scheduler.

第二点:

我认为这里的关键方面是你要传递给下游的值总是相同的,并且在你执行后不会改变T1。 这里可能有一些方法,但如果你想尽量减少对数据库的调用次数,并完全避免 XComs,你应该使用 TriggerDagRunOperator.

为此,您必须将 DAG 分成两部分,让 controller DAG 负责获取数据的任务从 MySQL,触发第二个 DAG,在其中使用从控制器获得的值执行所有 BashOperator DAG。您可以使用 conf 参数传入数据。

这里根据官方给出的例子Airflow example DAGs:

控制器 DAG:

from airflow import DAG
from airflow.models import Variable
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago


def _data_from_mysql():
    # fetch data from the DB or anywhere else
    # set a Variable
    data = {'legion': {'company': 'some_company', 'laptop': 'great_laptop'}}
    Variable.set('jobconfig', data, serialize_json=True)


dag = DAG(
    dag_id="example_trigger_controller_dag",
    default_args={"owner": "airflow"},
    start_date=days_ago(2),
    schedule_interval="@once",
    tags=['example'],
)

get_data_from_MySql = PythonOperator(
    task_id='get_data_from_MySql',
    python_callable=_data_from_mysql,
)

trigger = TriggerDagRunOperator(
    task_id="test_trigger_dagrun",
    # Ensure this equals the dag_id of the DAG to trigger
    trigger_dag_id="example_trigger_target_dag",
    conf={"message": "Company is {{var.json.jobconfig.legion.company}}"},
    execution_date='{{ds}}',
    dag=dag,
)
get_data_from_MySql >> trigger

trigger 任务执行时,将包含密钥 message 作为第二个 DAG 运行 配置的一部分DAG.

目标 DAG:

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

dag = DAG(
    dag_id="example_trigger_target_dag",
    default_args={"owner": "airflow"},
    start_date=days_ago(2),
    schedule_interval=None,
    tags=['example'],
)


def run_this_func(**context):
    """
    Print the payload "message" passed to the DagRun conf attribute.

    :param context: The execution context
    :type context: dict
    """
    print("Remotely received value of {} for key=message".format(
        context["dag_run"].conf["message"]))


run_this = PythonOperator(
    task_id="run_this", python_callable=run_this_func, dag=dag)

bash_task_1 = BashOperator(
    task_id="bash_task_1",
    bash_command='echo "Here is the message: $message"',
    env={'message': '{{ dag_run.conf["message"] if dag_run else "" }}'},
    dag=dag
)

本例中 bash_task_1 的日志将包括:

[2021-05-05 15:40:35,410] {bash.py:158} INFO - Running command: echo "Here is the message: $message"
[2021-05-05 15:40:35,418] {bash.py:169} INFO - Output:
[2021-05-05 15:40:35,419] {bash.py:173} INFO - Here is the message: Company is some_company
[2021-05-05 15:40:35,420] {bash.py:177} INFO - Command exited with return code 0

回顾:

  • 一个任务从数据库中获取数据并将其设置为 Variable
  • 触发第二个 DAG 传递 Variable 中的数据 conf
  • 在您的目标 DAG 中使用来自 dag_run.conf
  • 的数据

这样,当第二个 DAG 被触发时,您只会从 metadaba DB 中读取一次。

此外,为了避免在 BashOperator 任务定义期间重复太多代码,您可以这样做:

templated_bash_cmd = """
python3 {{params.path_to_script}} {{dag_run.conf["laptopName"]}} {{dag_run.conf["company"]}}
"""

bash_task_1 = BashOperator(
    task_id="bash_task_1",
    bash_command=templated_bash_cmd,
    params={
        'path_to_script': 'path/sequence1.py'
    },
    dag=dag
)

让我知道这是否对您有用!