气流数据处理
Airflow Data Handling
Airflow 新手,通过 Udemy 学习并阅读文档。
哪位好心人可以帮助我理解必须非常基础的东西,但我似乎无法找到一个好的解释,现在它阻碍了我的进步。
一个任务或 activity 如何将其结果传递给下一个任务以进行后续处理?我明白 tasks/activities 应该是幂等的;例如,任务 #1 获取了一些数据,任务 #2 对其做了一些事情,每次他们 运行 都会发生同样的事情。
我意识到没有数据从一个任务流向另一个任务的概念,因为 Airflow 实际上是协调器。
让我们以类似 SimpleHttpOperator 的情况为例,我在任务中使用它从 Web 资源获取一些数据,当没有出现时,我如何才能使该数据可用于下一个任务是 SimpleHttpOperator 中的任何方法,我可以看到,它允许我控制 where 来放置该数据,以便下一个任务可以使用它。
必须有一些 'standard' 方法来放置 SimpleHttpOperator 检索到的数据,以便下一个任务能够(读取它并)使用它?
我确定我一定遗漏了一些非常基本的东西,非常感谢任何指点。
最近我的回答是:
您可以使用 XComs 在任务之间传递元数据。正如您正确注意到的那样,Airflow 不应传递数据。所以你永远不应该使用 SimpleHTTPOperator 来检索数据。您可以做的最好的事情是使用它来检索您传递给另一个操作员的一些元数据,以便它知道如何处理存储在其他地方的数据(例如在 GCS 存储桶或 S3 中)。
通常,Airflow 会调用一些外部服务,将其放置在某个地方和 returns 元数据中数据的位置。
然而事情发生了变化。
最近,我们引入了自定义 XCom 后端(例如可以是 S3 或 GCS),您可以使用完全相同的 XComs 来实际传递数据。
我们刚刚 完成了 2021 年 Airflow 峰会,您可以在此处查看有关整个自定义 xcom 后端概念的更多信息:https://airflowsummit.org/sessions/2021/customizing-xcom-to-enhance-data-sharing-between-tasks/
新闻界新鲜出炉!
Airflow 新手,通过 Udemy 学习并阅读文档。
哪位好心人可以帮助我理解必须非常基础的东西,但我似乎无法找到一个好的解释,现在它阻碍了我的进步。
一个任务或 activity 如何将其结果传递给下一个任务以进行后续处理?我明白 tasks/activities 应该是幂等的;例如,任务 #1 获取了一些数据,任务 #2 对其做了一些事情,每次他们 运行 都会发生同样的事情。
我意识到没有数据从一个任务流向另一个任务的概念,因为 Airflow 实际上是协调器。
让我们以类似 SimpleHttpOperator 的情况为例,我在任务中使用它从 Web 资源获取一些数据,当没有出现时,我如何才能使该数据可用于下一个任务是 SimpleHttpOperator 中的任何方法,我可以看到,它允许我控制 where 来放置该数据,以便下一个任务可以使用它。
必须有一些 'standard' 方法来放置 SimpleHttpOperator 检索到的数据,以便下一个任务能够(读取它并)使用它?
我确定我一定遗漏了一些非常基本的东西,非常感谢任何指点。
最近我的回答是:
您可以使用 XComs 在任务之间传递元数据。正如您正确注意到的那样,Airflow 不应传递数据。所以你永远不应该使用 SimpleHTTPOperator 来检索数据。您可以做的最好的事情是使用它来检索您传递给另一个操作员的一些元数据,以便它知道如何处理存储在其他地方的数据(例如在 GCS 存储桶或 S3 中)。
通常,Airflow 会调用一些外部服务,将其放置在某个地方和 returns 元数据中数据的位置。
然而事情发生了变化。
最近,我们引入了自定义 XCom 后端(例如可以是 S3 或 GCS),您可以使用完全相同的 XComs 来实际传递数据。
我们刚刚 完成了 2021 年 Airflow 峰会,您可以在此处查看有关整个自定义 xcom 后端概念的更多信息:https://airflowsummit.org/sessions/2021/customizing-xcom-to-enhance-data-sharing-between-tasks/
新闻界新鲜出炉!