气流传感器中的模式 "reschedule" 是如何工作的?
How does the mode "reschedule" in Airflow Sensors work?
我有一个 Airflow Http 传感器,它调用 REST 端点并检查 API
返回的 JSON 结构中的特定值
sensor = HttpSensor(
soft_fail=True,
task_id='http_sensor_check',
http_conn_id='http_default',
endpoint='http://localhost:8082/api/v1/resources/games/all',
request_params={},
response_check=lambda response: True if check_api_response(response) is True else False,
mode='reschedule',
dag=dag)
如果 response_check 为假,DAG 将处于“up_for_reschedule”状态。问题是,DAG 永远保持这种状态,从未重新安排。
我的问题是:
- “up_for_reschedule”是什么意思? DAG 何时重新安排?
- 假设我的 DAG 计划为每 5 分钟 运行,但由于传感器的原因,“up_for_reschedule”DAG 实例与新的 运行 重叠,我将有 2 DAGS 运行同时宁?
提前谢谢你。
In sensor mode='reschedule'
意味着如果传感器的条件不是 True,则传感器将释放 worker 以执行其他任务。这对于传感器可能等待很长时间的情况非常有用。
up_for_reschedule
表示传感器条件尚未为真且尚未达到超时,因此任务正在等待调度程序重新安排。
- 你不知道任务什么时候会运行。这取决于调度程序(可用资源、优先级等)。如果您不想允许并行 dag 运行s 在 DAG 构造函数中使用
max_active_runs=1
。
我有一个 Airflow Http 传感器,它调用 REST 端点并检查 API
返回的 JSON 结构中的特定值sensor = HttpSensor(
soft_fail=True,
task_id='http_sensor_check',
http_conn_id='http_default',
endpoint='http://localhost:8082/api/v1/resources/games/all',
request_params={},
response_check=lambda response: True if check_api_response(response) is True else False,
mode='reschedule',
dag=dag)
如果 response_check 为假,DAG 将处于“up_for_reschedule”状态。问题是,DAG 永远保持这种状态,从未重新安排。
我的问题是:
- “up_for_reschedule”是什么意思? DAG 何时重新安排?
- 假设我的 DAG 计划为每 5 分钟 运行,但由于传感器的原因,“up_for_reschedule”DAG 实例与新的 运行 重叠,我将有 2 DAGS 运行同时宁?
提前谢谢你。
In sensor mode='reschedule'
意味着如果传感器的条件不是 True,则传感器将释放 worker 以执行其他任务。这对于传感器可能等待很长时间的情况非常有用。
up_for_reschedule
表示传感器条件尚未为真且尚未达到超时,因此任务正在等待调度程序重新安排。- 你不知道任务什么时候会运行。这取决于调度程序(可用资源、优先级等)。如果您不想允许并行 dag 运行s 在 DAG 构造函数中使用
max_active_runs=1
。