如何使用气流 FileSensor 检索最近修改的文件

How to retrieve recently modified files using airflow FileSensor

在实际系统中,一些传感器数据将作为 csv 文件转储到特定目录中。然后一些数据管道将这些数据填充到一些数据库中。另一个管道将发送这些数据以预测服务。

我现在只有训练和验证 csv 文件。我计划模拟发送数据的流程,以按照以下方式预测服务:

DAG1 - 每 2 分钟,select 从特定路径随机获取一些文件并更新这些文件的时间戳。后面可能会选择在起始节点后加一个随机延迟

DAG2 - FileSensor 每 3 分钟戳一次。如果它找到时间戳已修改的文件子集,它应该将这些文件传递给后续阶段,最终 运行 预测服务。

在我看来,如果按原样使用FileSensor,我无法实现。我必须从 FileSensor class(比如 MyDirSensor)派生,检查所有文件的时间戳 - select 在最后一次成功戳后修改的文件并将它们转发。

我的理解对吗?如果是,对于最后一次成功戳时间戳,我可以存储在 MyDirSensor 的某个变量中吗?我可以 push/pull 这个数据 to/from xcom 吗?在这种情况下,task-id 是什么?另外,如何将这些文件列表传递给下一个任务?

这个模拟有没有更好的方法(比如不同的传感器等)?目前,我 运行 在单机上完成整个流程。我的airflow版本是1.10.15.

我不确定当前的 Airflow 方法是否最适合这个用例。在当前的化身中,Airflow 实际上是关于处理“数据间隔”的——所以基本上每个“dag 运行”都连接到某个“数据间隔”,它应该处理该数据间隔的数据。经典批处理。

如果我理解你的情况更像是流媒体(不完全)但接近。您获得自上次以来到达的一些(子集)数据并处理该数据。这不是 Airflow 的(又是当前的)版本——甚至 2.1 也不应该处理——因为存在与“数据间隔”无关的“状态”的复杂操作(并且 Airflow 目前在“数据间隔”情况下表现出色).

您确实可以做一些自定义运算符来处理这个问题。我认为 Airflow 中没有可重复使用的模式来实现你想要实现的目标,但 Airflow 足够灵活,如果你编写自己的运算符,你当然可以解决它并实现你想要的。在 Airflow 中编写运算符非常容易 - 这是一个简单的 Python class 和“执行”,它可以重用现有的 Hooks 来接触外部 services/storages 并使用 XCom 进行任务之间的通信。添加一个新的运算符来执行复杂的逻辑非常容易(并且再次使用钩子以使其更容易与外部服务通信)。为此,我认为仍然值得使用 Airflow 来做你想做的事情。

我将如何处理它 - 而不是修改文件的时间戳,我会创建其他文件 - 标记 - 具有相同的名称,不同的扩展名并基于我的处理逻辑(这样你可以使用外部存储作为“状态”)。我认为不会有现成的“操作员”或“传感器”来帮助它,但同样 - 编写自定义的很容易并且应该有效。

但是很快(几个月)在 Airflow 2.2 中(在 2.3 中甚至更多)我们将进行一些更改(主要围绕灵活的调度和解耦 dag 运行s 与数据间隔,并最终允许动态具有灵活结构的 DAG,可以根据 运行) 进行更改,这将提供一些处理类似于您的案例的好方法。

敬请期待 - 现在依靠您自己的逻辑,但在 Airflow 更适合您的情况时,请注意简化它。

与此同时 - 升级到 Airflow 2。这是非常值得的,Airflow 1.10 在 6 月结束了生命周期,所以你越早升级越好 - 因为 Airflow 1.10 不会再有任何修复(甚至重要的安全修复)