Airflow 2 中的 S3KeySensor
S3KeySensor in Airflow 2
我有一个名为 my_dag.py
的 dag,它利用 Airflow 2 中的 S3KeySensor 检查是否存在 s3 密钥。当我直接在 dag 内部使用传感器时,它起作用了:
with TaskGroup('check_exists') as check_exists:
path = 's3://my-bucket/data/my_file'
poke_interval = 30
timeout = 60*60
mode = 'reschedule'
dependency_name = 'my_file'
S3KeySensor(
task_id = 'check_' + dependency_name + '_exists',
bucket_key = path,
poke_interval = poke_interval,
timeout = timeout,
mode = mode
)
上面的日志看起来像:
[2022-05-03, 19:51:26 UTC] {s3.py:105} INFO - Poking for key : s3://my-bucket/data/my_file
[2022-05-03, 19:51:26 UTC] {base_aws.py:90} INFO - Retrieving region_name from Connection.extra_config['region_name']
[2022-05-03, 19:51:27 UTC] {taskinstance.py:1701} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
这是正确的。预计会重新安排,因为该文件尚不存在。
但是,我想检查其他 dag 中的任意数量的路径,所以我将传感器移动到另一个名为 helpers.py
的文件中名为 test
的函数中。我在调用 test
的任务组中的 my_dag.py
中使用 python 运算符。它看起来像这样:
with TaskGroup('check_exists') as check_exists:
path = 's3://my-bucket/data/my_file'
dependency_name = 'my_file'
wait_for_dependencies = PythonOperator(
task_id = 'wait_for_my_file',
python_callable = test,
op_kwargs = {
'dependency_name': dependency_name,
'path': path
},
dag = dag
)
wait_for_dependencies
helpers.py
中的函数 test
看起来像:
def test(dependency_name, path, poke_interval = 30, timeout = 60 * 60, mode = 'reschedule'):
S3KeySensor(
task_id = 'check_' + dependency_name + '_exists',
bucket_key = path,
poke_interval = poke_interval,
timeout = timeout,
mode = mode
)
但是,当我 运行 dag 时,该步骤被标记为成功,即使该文件不存在。日志显示:
[2022-05-03, 20:07:54 UTC] {python.py:175} INFO - Done. Returned value was: None
[2022-05-03, 20:07:54 UTC] {taskinstance.py:1282} INFO - Marking task as SUCCESS.
气流似乎不喜欢通过 python 操作员使用传感器。这是真的?还是我做错了什么?
我的目标是遍历多条路径并检查每条路径是否存在。但是,我在其他 dag 中这样做,这就是为什么我将传感器放在驻留在另一个文件中的函数中的原因。
如果有替代的想法,我愿意!
感谢您的帮助!
这不会像您预期的那样工作。
您创建了一个运算符内部运算符的案例。有关这意味着什么的信息,请参阅此 。
在你的例子中,你用 PythonOperator
包装了 S3KeySensor
。这意味着当 PythonOperator
运行时它只执行 S3KeySensor
的初始化函数——它不会调用运算符本身的逻辑。
在运算符内部使用运算符是一种不好的做法。
你的情况更加极端,因为你试图在操作员内部使用传感器。传感器需要为每个戳周期调用 poke()
函数。
为简化起见 - 当您像您一样设置它们时,您无法享受带有 mode = 'reschedule'
的传感器的强大功能,因为重新安排意味着如果条件尚未满足但 PythonOperator
不知道,您想要释放工作人员怎么做。
如何解决您的问题:
选项 1:
根据您展示的代码,您可以简单地执行以下操作:
with TaskGroup('check_exists') as check_exists:
path = 's3://my-bucket/data/my_file'
dependency_name = 'my_file'
S3KeySensor(
task_id='check_' + dependency_name + '_exists',
bucket_key=path,
poke_interval=30,
timeout=60 * 60,
mode='reschedule'
)
我没有看到这对你不起作用的原因。
选项 2:
如果由于某种原因选项 1 对您不利,则创建一个自定义传感器,它也接受 dependency_name
、path
并像使用任何其他运算符一样使用它。
我没有测试它,但像下面这样的东西应该可以工作:
class MyS3KeySensor(S3KeySensor):
def __init__(
self,
*,
dependency_name:str = None,
path: str = None,
**kwargs,
):
super().__init__(**kwargs)
self.task_id = task_id = 'check_' + dependency_name + '_exists'
self.bucket_name = path
我有一个名为 my_dag.py
的 dag,它利用 Airflow 2 中的 S3KeySensor 检查是否存在 s3 密钥。当我直接在 dag 内部使用传感器时,它起作用了:
with TaskGroup('check_exists') as check_exists:
path = 's3://my-bucket/data/my_file'
poke_interval = 30
timeout = 60*60
mode = 'reschedule'
dependency_name = 'my_file'
S3KeySensor(
task_id = 'check_' + dependency_name + '_exists',
bucket_key = path,
poke_interval = poke_interval,
timeout = timeout,
mode = mode
)
上面的日志看起来像:
[2022-05-03, 19:51:26 UTC] {s3.py:105} INFO - Poking for key : s3://my-bucket/data/my_file
[2022-05-03, 19:51:26 UTC] {base_aws.py:90} INFO - Retrieving region_name from Connection.extra_config['region_name']
[2022-05-03, 19:51:27 UTC] {taskinstance.py:1701} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
这是正确的。预计会重新安排,因为该文件尚不存在。
但是,我想检查其他 dag 中的任意数量的路径,所以我将传感器移动到另一个名为 helpers.py
的文件中名为 test
的函数中。我在调用 test
的任务组中的 my_dag.py
中使用 python 运算符。它看起来像这样:
with TaskGroup('check_exists') as check_exists:
path = 's3://my-bucket/data/my_file'
dependency_name = 'my_file'
wait_for_dependencies = PythonOperator(
task_id = 'wait_for_my_file',
python_callable = test,
op_kwargs = {
'dependency_name': dependency_name,
'path': path
},
dag = dag
)
wait_for_dependencies
helpers.py
中的函数 test
看起来像:
def test(dependency_name, path, poke_interval = 30, timeout = 60 * 60, mode = 'reschedule'):
S3KeySensor(
task_id = 'check_' + dependency_name + '_exists',
bucket_key = path,
poke_interval = poke_interval,
timeout = timeout,
mode = mode
)
但是,当我 运行 dag 时,该步骤被标记为成功,即使该文件不存在。日志显示:
[2022-05-03, 20:07:54 UTC] {python.py:175} INFO - Done. Returned value was: None
[2022-05-03, 20:07:54 UTC] {taskinstance.py:1282} INFO - Marking task as SUCCESS.
气流似乎不喜欢通过 python 操作员使用传感器。这是真的?还是我做错了什么?
我的目标是遍历多条路径并检查每条路径是否存在。但是,我在其他 dag 中这样做,这就是为什么我将传感器放在驻留在另一个文件中的函数中的原因。
如果有替代的想法,我愿意!
感谢您的帮助!
这不会像您预期的那样工作。
您创建了一个运算符内部运算符的案例。有关这意味着什么的信息,请参阅此
在你的例子中,你用 PythonOperator
包装了 S3KeySensor
。这意味着当 PythonOperator
运行时它只执行 S3KeySensor
的初始化函数——它不会调用运算符本身的逻辑。
在运算符内部使用运算符是一种不好的做法。
你的情况更加极端,因为你试图在操作员内部使用传感器。传感器需要为每个戳周期调用 poke()
函数。
为简化起见 - 当您像您一样设置它们时,您无法享受带有 mode = 'reschedule'
的传感器的强大功能,因为重新安排意味着如果条件尚未满足但 PythonOperator
不知道,您想要释放工作人员怎么做。
如何解决您的问题:
选项 1:
根据您展示的代码,您可以简单地执行以下操作:
with TaskGroup('check_exists') as check_exists:
path = 's3://my-bucket/data/my_file'
dependency_name = 'my_file'
S3KeySensor(
task_id='check_' + dependency_name + '_exists',
bucket_key=path,
poke_interval=30,
timeout=60 * 60,
mode='reschedule'
)
我没有看到这对你不起作用的原因。
选项 2:
如果由于某种原因选项 1 对您不利,则创建一个自定义传感器,它也接受 dependency_name
、path
并像使用任何其他运算符一样使用它。
我没有测试它,但像下面这样的东西应该可以工作:
class MyS3KeySensor(S3KeySensor):
def __init__(
self,
*,
dependency_name:str = None,
path: str = None,
**kwargs,
):
super().__init__(**kwargs)
self.task_id = task_id = 'check_' + dependency_name + '_exists'
self.bucket_name = path