Airflow 2 中的 S3KeySensor

S3KeySensor in Airflow 2

我有一个名为 my_dag.py 的 dag,它利用 Airflow 2 中的 S3KeySensor 检查是否存在 s3 密钥。当我直接在 dag 内部使用传感器时,它起作用了:

with TaskGroup('check_exists') as check_exists: 
    
  path = 's3://my-bucket/data/my_file'
  poke_interval = 30
  timeout = 60*60
  mode = 'reschedule'
  dependency_name = 'my_file'

  S3KeySensor(
    task_id = 'check_' + dependency_name + '_exists',
    bucket_key = path,
    poke_interval = poke_interval,
    timeout = timeout,
    mode = mode
  )

上面的日志看起来像:

[2022-05-03, 19:51:26 UTC] {s3.py:105} INFO - Poking for key : s3://my-bucket/data/my_file
[2022-05-03, 19:51:26 UTC] {base_aws.py:90} INFO - Retrieving region_name from Connection.extra_config['region_name']
[2022-05-03, 19:51:27 UTC] {taskinstance.py:1701} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE

这是正确的。预计会重新安排,因为该文件尚不存在。

但是,我想检查其他 dag 中的任意数量的路径,所以我将传感器移动到另一个名为 helpers.py 的文件中名为 test 的函数中。我在调用 test 的任务组中的 my_dag.py 中使用 python 运算符。它看起来像这样:

with TaskGroup('check_exists') as check_exists:

  path = 's3://my-bucket/data/my_file'
  dependency_name = 'my_file'

  wait_for_dependencies = PythonOperator(
    task_id = 'wait_for_my_file',
    python_callable = test,
    op_kwargs = {
      'dependency_name': dependency_name,
      'path': path
    },
    dag = dag
  )

  wait_for_dependencies

helpers.py 中的函数 test 看起来像:

def test(dependency_name, path, poke_interval = 30, timeout = 60 * 60, mode = 'reschedule'):

    S3KeySensor(
        task_id = 'check_' + dependency_name + '_exists',
        bucket_key = path,
        poke_interval = poke_interval,
        timeout = timeout,
        mode = mode
    )

但是,当我 运行 dag 时,该步骤被标记为成功,即使该文件不存在。日志显示:

[2022-05-03, 20:07:54 UTC] {python.py:175} INFO - Done. Returned value was: None
[2022-05-03, 20:07:54 UTC] {taskinstance.py:1282} INFO - Marking task as SUCCESS.

气流似乎不喜欢通过 python 操作员使用传感器。这是真的?还是我做错了什么?

我的目标是遍历多条路径并检查每条路径是否存在。但是,我在其他 dag 中这样做,这就是为什么我将传感器放在驻留在另一个文件中的函数中的原因。

如果有替代的想法,我愿意!

感谢您的帮助!

这不会像您预期的那样工作。 您创建了一个运算符内部运算符的案例。有关这意味着什么的信息,请参阅此

在你的例子中,你用 PythonOperator 包装了 S3KeySensor。这意味着当 PythonOperator 运行时它只执行 S3KeySensor 的初始化函数——它不会调用运算符本身的逻辑。 在运算符内部使用运算符是一种不好的做法。

你的情况更加极端,因为你试图在操作员内部使用传感器。传感器需要为每个戳周期调用 poke() 函数。 为简化起见 - 当您像您一样设置它们时,您无法享受带有 mode = 'reschedule' 的传感器的强大功能,因为重新安排意味着如果条件尚未满足但 PythonOperator 不知道,您想要释放工作人员怎么做。

如何解决您的问题:

选项 1:

根据您展示的代码,您可以简单地执行以下操作:

with TaskGroup('check_exists') as check_exists:
    
  path = 's3://my-bucket/data/my_file'
  dependency_name = 'my_file'

  S3KeySensor(
    task_id='check_' + dependency_name + '_exists',
    bucket_key=path,
    poke_interval=30,
    timeout=60 * 60,
    mode='reschedule'
  )

我没有看到这对你不起作用的原因。

选项 2:

如果由于某种原因选项 1 对您不利,则创建一个自定义传感器,它也接受 dependency_namepath 并像使用任何其他运算符一样使用它。 我没有测试它,但像下面这样的东西应该可以工作:

class MyS3KeySensor(S3KeySensor):
    def __init__(
        self,
        *,
        dependency_name:str = None,
        path: str = None,
        **kwargs,
    ):
        super().__init__(**kwargs)
        self.task_id = task_id = 'check_' + dependency_name + '_exists'
        self.bucket_name = path