对 Airflow 的 BaseSensorOperator 参数感到困惑:超时、poke_interval 和模式

Confused about Airflow's BaseSensorOperator parameters : timeout, poke_interval and mode

我对 BaseSensorOperator 参数的工作方式有点困惑:timeoutpoke_interval。 考虑传感器的这种用法:

BaseSensorOperator(
  soft_fail=True,
  poke_interval = 4*60*60,  # Poke every 4 hours
  timeout = 12*60*60,  # Timeout after 12 hours
)

文档中提到了超时行为,以便在任务用完后将任务设置为 'fail'。但是我使用的是 soft_fail=True,我认为它不会保留相同的行为,因为我发现任务失败了,而不是在我使用了两个参数 soft_fail 和 [=14 之后跳过=].

那么这里发生了什么?

  1. 传感器每4小时戳一次,每次戳一次,都会等待超时时间(12小时)?
  2. 还是每4小时戳一次,总共戳3次,然后超时?
  3. 此外,如果我使用 mode="reschedule",这些参数会发生什么情况?

这是 BaseSensorOperator 的文档

class BaseSensorOperator(BaseOperator, SkipMixin):
    """
    Sensor operators are derived from this class and inherit these attributes.
    Sensor operators keep executing at a time interval and succeed when
    a criteria is met and fail if and when they time out.
    :param soft_fail: Set to true to mark the task as SKIPPED on failure
    :type soft_fail: bool
    :param poke_interval: Time in seconds that the job should wait in
        between each tries
    :type poke_interval: int
    :param timeout: Time, in seconds before the task times out and fails.
    :type timeout: int
    :param mode: How the sensor operates.
        Options are: ``{ poke | reschedule }``, default is ``poke``.
        When set to ``poke`` the sensor is taking up a worker slot for its
        whole execution time and sleeps between pokes. Use this mode if the
        expected runtime of the sensor is short or if a short poke interval
        is requried.
        When set to ``reschedule`` the sensor task frees the worker slot when
        the criteria is not yet met and it's rescheduled at a later time. Use
        this mode if the expected time until the criteria is met is. The poke
        inteval should be more than one minute to prevent too much load on
        the scheduler.
    :type mode: str
    """

定义术语

  1. poke_interval:持续时间b/w连续'pokes'(评价为'sensed'的必要条件)

  2. timeout:无限期地 是不可接受的(例如,如果您的错误代码在当月为 2 时戳到 29 日,它会持续戳长达 4 年)。因此,我们定义了一个最大周期,超过该周期我们将停止 poking 并终止(传感器标记为 FAILEDSKIPPED

  3. soft_fail:正常情况下(当soft_fail=False时),超时后传感器标记为FAILED。当 soft_fail=True 时,传感器将在超时后被标记为 SKIPPED

  4. mode:这个有点复杂

    • 任何任务(包括传感器)在运行时都会占用某个池中的 slotdefault 池或明确指定的 pool);本质上意味着它占用了一些资源。
    • 对于传感器,这是
      • 浪费:因为即使我们只是等待(不做任何实际工作
      • ,也会消耗一个插槽
      • 危险:如果您的工作流程中有太多传感器大约同时进入传感,它们会冻结大量资源相当长一段时间。事实上有太多 ExternalTaskSensors 是 for putting entire workflows (DAGs) into deadlocks
    • 为了克服这个问题,Airflow v1.10.2 introduced mode 传感器
      • mode='poke'(默认)表示我们上面讨论的现有行为
      • mode='reschedule' 意味着在 戳尝试 之后,而不是 going to sleep,传感器将表现得好像它失败了(在当前尝试中)并且它的状态将从 RUNNING 变为 UP_FOR_RETRY。这样,它将 释放 它的插槽,允许其他任务在等待另一次 戳尝试
      • 时继续进行
    • 在此处引用相关片段 from code
    if self.reschedule:
        reschedule_date = timezone.utcnow() + timedelta(
            seconds=self._get_next_poke_interval(started_at, try_number))
        raise AirflowRescheduleException(reschedule_date)
    else:
        sleep(self._get_next_poke_interval(started_at, try_number))
        try_number += 1
    

现在直接回答您的问题

Q1

  1. The sensor pokes every 4 hours, and at every poke, will wait for the duration of the timeout (12 hours)?
  2. Or does it poke every 4 hours, for a total of 3 pokes, then times out?

第2点正确

Q2

Also, what happens with these parameters if I use the mode="reschedule"?

如前所述,这些参数中的每一个都是独立的,设置 mode='reschedule' 不会以任何方式改变它们的行为

BaseSensorOperator(
  soft_fail=True,
  poke_interval = 4*60*60,  # Poke every 4 hours
  timeout = 12*60*60,  # Timeout of 12 hours
  mode = "reschedule"
)

假设第一次戳时不符合标准。所以它会在间隔 4 小时后再次 运行。但是由于我们使用的是 mode="reschedule".

,因此工作槽将在等待期间被释放

我是这么理解的