Airflow 上 S3KeySensor 中的多个文件路径

multiple filepaths in S3KeySensor on Airflow

当 S3 上的少数特定文件或目录之一发生更改时,我有一些任务需要 运行。

假设我有 PythonOperator,如果 /path/file.csv 发生变化或 /path/nested_path/some_other_file.csv 发生变化,它需要 运行。

我试过像这样创建动态 KeySensors:

    trigger_path_list = ['/path/file.csv', '//path/nested_path/some_other_file.csv']
    for trigger_path in trigger_path_list:
        file_sensor_task = S3KeySensor(
                    task_id=get_sensor_task_name(trigger_path),
                    poke_interval=30,
                    timeout=60 * 60 * 24 * 8,
                    bucket_key=os.path.join('s3://', s3_bucket_name, trigger_path),
                    wildcard_match=True)
                file_sensor_task >> main_task

但是,这意味着必须触发两个 S3KeySensor 才能对其进行处理。 我还尝试使这两项任务都独一无二,如下所示:

        for trigger_path in trigger_path_list:
            main_task = PythonOperator(
task_id='{}_task_triggered_by_{}'.format(dag_name, trigger_path), 
...)
            file_sensor_task = S3KeySensor(
                task_id=get_sensor_task_name(trigger_path),
                poke_interval=30,
                timeout=60 * 60 * 24 * 8,
                bucket_key=os.path.join('s3://', s3_bucket_name, trigger_path),
                wildcard_match=True)
            file_sensor_task >> main_task

但是,这意味着如果列表中的所有文件都没有出现,DAG 将无法完成。所以如果/path/file.csv连续出现2次,第二次就不会触发,DAG这部分就完成了。

没有办法将多个文件传递给 S3KeySensor 吗?我不想为每条路径创建一个 DAG,因为对我来说,它将是 40 DAGS x 大约 5 条路径,这给出了大约 200 个 DAG。

有什么想法吗?

对此的一些想法:

  1. 使用 Airflow 的其他任务 trigger rules, specifically you probably want one_success on the main task, which means just one of however many upstream sensors need to succeed for the task to run. This does mean other sensors will still keep running, but you could potentially use soft_failpoll_timeout 标志以避免任何失败。或者,您可以让主任务或单独的 post-cleanup 任务将 DAG 中的其余传感器标记为成功。
  2. 取决于有多少可能的路径,如果不是太多,那么也许只有一个任务传感器循环遍历路径以检查变化。一旦一条路径通过检查,您就可以 return 这样传感器就成功了。否则,如果没有路径通过,则继续轮询。

在任何一种情况下,如果您希望继续收听新文件,您仍然需要安排此 DAG frequently/non-stop。一般来说,Airflow 并不是真正为长 运行 进程设计的。如果主要任务逻辑更容易通过 Airflow 执行,您仍然可以考虑更改外部进程监视器,但随后 trigger a DAG 通过包含主要任务的 API 或 CLI。

也不确定此处是否适用或您已经考虑过的内容,但您可能对 S3 Event Notifications to more explicitly learn about changed files or directories, which could then be consumed by the SQSSensor 感兴趣。