运营商之间的气流和数据传输
Airflow and data transfer between operators
我是 airflow 的新手,对 Airflow 及其处理器有疑问。
当处理器产生输出时,该输出如何在输入中移动到下一个处理器?
有一个名为 nifi 的软件将中间输出存储到流文件中,afaik 在气流中没有这样的东西。
那么这是怎么发生的呢?
提前致谢。
Airflow 使用 Xcoms 在运算符之间传递数据。
如果流程是运算符 A -> 运算符 B,则运算符 A 必须 "push" 一个值到 xcom,而运算符 B 必须 "pull" 来自 A 的值才能读取它。
A 下游的任何操作员都可以通过以下方式访问 A 推送到 Xcom 的任何值:
value = context['task_instance'].xcom_pull(task_ids='operator_a', key='key_name')
并且操作员 A 会像这样推送此值:
context['task_instance'].xcom_push(key_name,value,context['execution_date'])
也许您指的是 GenericTransfer 运算符,它有助于在数据源之间移动数据?
https://github.com/apache/incubator-airflow/blob/master/airflow/operators/generic_transfer.py
我是 airflow 的新手,对 Airflow 及其处理器有疑问。 当处理器产生输出时,该输出如何在输入中移动到下一个处理器? 有一个名为 nifi 的软件将中间输出存储到流文件中,afaik 在气流中没有这样的东西。 那么这是怎么发生的呢?
提前致谢。
Airflow 使用 Xcoms 在运算符之间传递数据。
如果流程是运算符 A -> 运算符 B,则运算符 A 必须 "push" 一个值到 xcom,而运算符 B 必须 "pull" 来自 A 的值才能读取它。
A 下游的任何操作员都可以通过以下方式访问 A 推送到 Xcom 的任何值:
value = context['task_instance'].xcom_pull(task_ids='operator_a', key='key_name')
并且操作员 A 会像这样推送此值:
context['task_instance'].xcom_push(key_name,value,context['execution_date'])
也许您指的是 GenericTransfer 运算符,它有助于在数据源之间移动数据?
https://github.com/apache/incubator-airflow/blob/master/airflow/operators/generic_transfer.py