如何使用 PythonOperator callables 的结果?

How to use results from PythonOperator callables?

我是 Airflow 世界的新手,我已经完成了很多教程来将我的 Python 脚本之一转换为 DAG。

基本上有四个 PythonOperator calls 调用以下内容

def taskOne():
    # Fetch data from API and create mapping dicts

def taskTwo():
    # Fetch data from another API

def taskThree():
    # Combine data from task one and two into one dataframe

def taskFour():
    # Push the dataset to Apache Kafka

声明了以下运算符来调用代码:

taskOne_operator    = PythonOperator(task_id='taskOne',     python_callable=taskOne, dag=dag)
taskTwo_operator    = PythonOperator(task_id='taskTwo',     python_callable=taskTwo, dag=dag)
taskThree_operator  = PythonOperator(task_id='taskThree',   python_callable=taskThree, dag=dag)
taskFour_operator   = PythonOperator(task_id='taskFour',    python_callable=taskFour, dag=dag)

我在任务中声明了以下依赖项:

[taskOne_operator,taskTwo_operator]>>taskThree_operator
taskThree_operator>>taskFour_operator

前两个任务运行成功,这是一件好事。 但是,显然(或者显然,但出乎我的意料)taskThree 无法找到在 taskOnetaskTwo 中创建的映射字典或数据集。 在这方面,简单地将原始 Python 脚本分成多个部分是行不通的。

NameError: name 'data' is not defined

我曾尝试使用 returns 来完成任务,但没有成功。任何人都可以分享一些关于我做错了什么的见解,也许可以指出一些关于如何解决这个问题的教程的方向吗?

谢谢!

Airflow 中的任务不会共享 Python 对象,就像它们 运行 在同一脚本中一样。每个 Airflow 任务 运行s 在一个单独的进程中。此外,运行使用 {Celery,Kubernetes}Executor 时,任务可以分布在多台机器上,因此您也不能总是假设数据可以通过本地磁盘共享。

在 Airflow 中,您可以通过多种方式在任务之间共享数据:

  • 从 XCom 推入和拉出
  • 使用 Airflow TaskFlow API
  • 处理 Airflow 外部的共享数据

从 XCom 推入和拉出

XComs(交叉通信)是 Airflow 中用于在任务之间共享数据的机制。这是通过“推”和“拉”XCom 来实现的:

def _train_model(**context):
    model_id = do_magic()
    context["task_instance"].xcom_push(key="model_id", value=model_id)

def _deploy_model(**context):
    model_id = context["task_instance"].xcom_pull(task_ids="train_model", key="model_id")
    print(f"Deploying model {model_id}")

train_model = PythonOperator(task_id="train_model", python_callable=_train_model)
deploy_model = PythonOperator(task_id="deploy_model", python_callable=_deploy_model)
train_model >> deploy_model

方法 xcom_pushxcom_pull 是在 TaskInstance class 上定义的,因此您必须从上下文中获取 context["task_instance"](有一个更短的别名 ti 如果你喜欢的话)。然后,给定值存储在 Airflow 元存储中的 table“xcom”中,也可以在 Admin -> XComs 下的 Airflow 网络服务器中查看。

请注意(默认情况下,也是可配置的)从 Python 可调用对象返回一个值将自动使用键“return_value”将返回值推送到 XCom。没有更短的方法来提取 XCom 值。

从历史上看,推送 XComs 是通过酸洗对象并将其存储在 Airflow Metastore 中来工作的。从 Airflow 2.0 开始,数据被序列化为 JSON(不过 pickle 是可配置的)。另外,您可以配置不同的位置来存储数据,而不是 Airflow Metastore。例如 AWS S3。既然你提到了数据框和 Kafka,我可以想象有很多数据,所以可能值得查看自定义后端文档以获取更多相关信息:https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html#custom-backends.

使用 Airflow TaskFlow API

Airflow TaskFlow API 已添加到 Airflow 2.0 中,为 Airflow 任务提供了更实用的方法,并减少了 XCom 样板代码。这是使用 TaskFlow API:

的 DAG
import datetime

from airflow import DAG
from airflow.decorators import task

with DAG("example_dag", start_date=datetime.datetime(2021, 1, 1), schedule_interval=None) as dag:

    @task
    def task_one():
        return "one"

    @task
    def task_two():
        return "two"

    @task
    def task_three(one, two):
        return ",".join([one, two])

    @task
    def task_four(result):
        print(result)

    task_four(task_three(task_one(), task_two()))

@task 装饰器将常规 Python 函数转换为 Airflow 任务。 Return 值会自动推送到 XCom。并将一个函数的输出作为参数传递给下一个函数会自动提取 XCom 值。

为了完整性,还有 @dag 装饰器:

from airflow.decorators import dag

@dag(schedule_interval=None, start_date=datetime.datetime(2021, 1, 1))
def example_dag():
    ... your tasks ...

example_dag_ = example_dag()

处理 Airflow 外部的共享数据

在某些情况下,Airflow XCom 模型不适合。您必须在 Airflow 任务中实现自己的写入和读取数据逻辑,这提供了最大的灵活性,但显然也是最多的工作。出于各种原因可以这样做。例如,如果您的任务需要更新数据库中的记录,并且在下一个任务中再次读取更新的记录。