如何使用 PythonOperator callables 的结果?
How to use results from PythonOperator callables?
我是 Airflow 世界的新手,我已经完成了很多教程来将我的 Python 脚本之一转换为 DAG。
基本上有四个 PythonOperator calls 调用以下内容
def taskOne():
# Fetch data from API and create mapping dicts
def taskTwo():
# Fetch data from another API
def taskThree():
# Combine data from task one and two into one dataframe
def taskFour():
# Push the dataset to Apache Kafka
声明了以下运算符来调用代码:
taskOne_operator = PythonOperator(task_id='taskOne', python_callable=taskOne, dag=dag)
taskTwo_operator = PythonOperator(task_id='taskTwo', python_callable=taskTwo, dag=dag)
taskThree_operator = PythonOperator(task_id='taskThree', python_callable=taskThree, dag=dag)
taskFour_operator = PythonOperator(task_id='taskFour', python_callable=taskFour, dag=dag)
我在任务中声明了以下依赖项:
[taskOne_operator,taskTwo_operator]>>taskThree_operator
taskThree_operator>>taskFour_operator
前两个任务运行成功,这是一件好事。
但是,显然(或者显然,但出乎我的意料)taskThree
无法找到在 taskOne
和 taskTwo
中创建的映射字典或数据集。
在这方面,简单地将原始 Python 脚本分成多个部分是行不通的。
NameError: name 'data' is not defined
我曾尝试使用 returns 来完成任务,但没有成功。任何人都可以分享一些关于我做错了什么的见解,也许可以指出一些关于如何解决这个问题的教程的方向吗?
谢谢!
Airflow 中的任务不会共享 Python 对象,就像它们 运行 在同一脚本中一样。每个 Airflow 任务 运行s 在一个单独的进程中。此外,运行使用 {Celery,Kubernetes}Executor 时,任务可以分布在多台机器上,因此您也不能总是假设数据可以通过本地磁盘共享。
在 Airflow 中,您可以通过多种方式在任务之间共享数据:
- 从 XCom 推入和拉出
- 使用 Airflow TaskFlow API
- 处理 Airflow 外部的共享数据
从 XCom 推入和拉出
XComs(交叉通信)是 Airflow 中用于在任务之间共享数据的机制。这是通过“推”和“拉”XCom 来实现的:
def _train_model(**context):
model_id = do_magic()
context["task_instance"].xcom_push(key="model_id", value=model_id)
def _deploy_model(**context):
model_id = context["task_instance"].xcom_pull(task_ids="train_model", key="model_id")
print(f"Deploying model {model_id}")
train_model = PythonOperator(task_id="train_model", python_callable=_train_model)
deploy_model = PythonOperator(task_id="deploy_model", python_callable=_deploy_model)
train_model >> deploy_model
方法 xcom_push
和 xcom_pull
是在 TaskInstance class 上定义的,因此您必须从上下文中获取 context["task_instance"]
(有一个更短的别名 ti
如果你喜欢的话)。然后,给定值存储在 Airflow 元存储中的 table“xcom”中,也可以在 Admin -> XComs 下的 Airflow 网络服务器中查看。
请注意(默认情况下,也是可配置的)从 Python 可调用对象返回一个值将自动使用键“return_value”将返回值推送到 XCom。没有更短的方法来提取 XCom 值。
从历史上看,推送 XComs 是通过酸洗对象并将其存储在 Airflow Metastore 中来工作的。从 Airflow 2.0 开始,数据被序列化为 JSON(不过 pickle 是可配置的)。另外,您可以配置不同的位置来存储数据,而不是 Airflow Metastore。例如 AWS S3。既然你提到了数据框和 Kafka,我可以想象有很多数据,所以可能值得查看自定义后端文档以获取更多相关信息:https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html#custom-backends.
使用 Airflow TaskFlow API
Airflow TaskFlow API 已添加到 Airflow 2.0 中,为 Airflow 任务提供了更实用的方法,并减少了 XCom 样板代码。这是使用 TaskFlow API:
的 DAG
import datetime
from airflow import DAG
from airflow.decorators import task
with DAG("example_dag", start_date=datetime.datetime(2021, 1, 1), schedule_interval=None) as dag:
@task
def task_one():
return "one"
@task
def task_two():
return "two"
@task
def task_three(one, two):
return ",".join([one, two])
@task
def task_four(result):
print(result)
task_four(task_three(task_one(), task_two()))
@task
装饰器将常规 Python 函数转换为 Airflow 任务。 Return 值会自动推送到 XCom。并将一个函数的输出作为参数传递给下一个函数会自动提取 XCom 值。
为了完整性,还有 @dag
装饰器:
from airflow.decorators import dag
@dag(schedule_interval=None, start_date=datetime.datetime(2021, 1, 1))
def example_dag():
... your tasks ...
example_dag_ = example_dag()
处理 Airflow 外部的共享数据
在某些情况下,Airflow XCom 模型不适合。您必须在 Airflow 任务中实现自己的写入和读取数据逻辑,这提供了最大的灵活性,但显然也是最多的工作。出于各种原因可以这样做。例如,如果您的任务需要更新数据库中的记录,并且在下一个任务中再次读取更新的记录。
我是 Airflow 世界的新手,我已经完成了很多教程来将我的 Python 脚本之一转换为 DAG。
基本上有四个 PythonOperator calls 调用以下内容
def taskOne():
# Fetch data from API and create mapping dicts
def taskTwo():
# Fetch data from another API
def taskThree():
# Combine data from task one and two into one dataframe
def taskFour():
# Push the dataset to Apache Kafka
声明了以下运算符来调用代码:
taskOne_operator = PythonOperator(task_id='taskOne', python_callable=taskOne, dag=dag)
taskTwo_operator = PythonOperator(task_id='taskTwo', python_callable=taskTwo, dag=dag)
taskThree_operator = PythonOperator(task_id='taskThree', python_callable=taskThree, dag=dag)
taskFour_operator = PythonOperator(task_id='taskFour', python_callable=taskFour, dag=dag)
我在任务中声明了以下依赖项:
[taskOne_operator,taskTwo_operator]>>taskThree_operator
taskThree_operator>>taskFour_operator
前两个任务运行成功,这是一件好事。
但是,显然(或者显然,但出乎我的意料)taskThree
无法找到在 taskOne
和 taskTwo
中创建的映射字典或数据集。
在这方面,简单地将原始 Python 脚本分成多个部分是行不通的。
NameError: name 'data' is not defined
我曾尝试使用 returns 来完成任务,但没有成功。任何人都可以分享一些关于我做错了什么的见解,也许可以指出一些关于如何解决这个问题的教程的方向吗?
谢谢!
Airflow 中的任务不会共享 Python 对象,就像它们 运行 在同一脚本中一样。每个 Airflow 任务 运行s 在一个单独的进程中。此外,运行使用 {Celery,Kubernetes}Executor 时,任务可以分布在多台机器上,因此您也不能总是假设数据可以通过本地磁盘共享。
在 Airflow 中,您可以通过多种方式在任务之间共享数据:
- 从 XCom 推入和拉出
- 使用 Airflow TaskFlow API
- 处理 Airflow 外部的共享数据
从 XCom 推入和拉出
XComs(交叉通信)是 Airflow 中用于在任务之间共享数据的机制。这是通过“推”和“拉”XCom 来实现的:
def _train_model(**context):
model_id = do_magic()
context["task_instance"].xcom_push(key="model_id", value=model_id)
def _deploy_model(**context):
model_id = context["task_instance"].xcom_pull(task_ids="train_model", key="model_id")
print(f"Deploying model {model_id}")
train_model = PythonOperator(task_id="train_model", python_callable=_train_model)
deploy_model = PythonOperator(task_id="deploy_model", python_callable=_deploy_model)
train_model >> deploy_model
方法 xcom_push
和 xcom_pull
是在 TaskInstance class 上定义的,因此您必须从上下文中获取 context["task_instance"]
(有一个更短的别名 ti
如果你喜欢的话)。然后,给定值存储在 Airflow 元存储中的 table“xcom”中,也可以在 Admin -> XComs 下的 Airflow 网络服务器中查看。
请注意(默认情况下,也是可配置的)从 Python 可调用对象返回一个值将自动使用键“return_value”将返回值推送到 XCom。没有更短的方法来提取 XCom 值。
从历史上看,推送 XComs 是通过酸洗对象并将其存储在 Airflow Metastore 中来工作的。从 Airflow 2.0 开始,数据被序列化为 JSON(不过 pickle 是可配置的)。另外,您可以配置不同的位置来存储数据,而不是 Airflow Metastore。例如 AWS S3。既然你提到了数据框和 Kafka,我可以想象有很多数据,所以可能值得查看自定义后端文档以获取更多相关信息:https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html#custom-backends.
使用 Airflow TaskFlow API
Airflow TaskFlow API 已添加到 Airflow 2.0 中,为 Airflow 任务提供了更实用的方法,并减少了 XCom 样板代码。这是使用 TaskFlow API:
的 DAGimport datetime
from airflow import DAG
from airflow.decorators import task
with DAG("example_dag", start_date=datetime.datetime(2021, 1, 1), schedule_interval=None) as dag:
@task
def task_one():
return "one"
@task
def task_two():
return "two"
@task
def task_three(one, two):
return ",".join([one, two])
@task
def task_four(result):
print(result)
task_four(task_three(task_one(), task_two()))
@task
装饰器将常规 Python 函数转换为 Airflow 任务。 Return 值会自动推送到 XCom。并将一个函数的输出作为参数传递给下一个函数会自动提取 XCom 值。
为了完整性,还有 @dag
装饰器:
from airflow.decorators import dag
@dag(schedule_interval=None, start_date=datetime.datetime(2021, 1, 1))
def example_dag():
... your tasks ...
example_dag_ = example_dag()
处理 Airflow 外部的共享数据
在某些情况下,Airflow XCom 模型不适合。您必须在 Airflow 任务中实现自己的写入和读取数据逻辑,这提供了最大的灵活性,但显然也是最多的工作。出于各种原因可以这样做。例如,如果您的任务需要更新数据库中的记录,并且在下一个任务中再次读取更新的记录。