TypeError: unsupported type for timedelta days component: dict

TypeError: unsupported type for timedelta days component: dict

我在 Airflow DAG 中有一个传感器任务,如下所示:

sensor = ExternalTaskSensor(
  task_id='wait_for_parent',
  external_dag_id='parent',
  external_task_id='reporting',
  allowed_states=['success'],
  execution_date_fn=get_updated_exec_date,
  dag=dag)

函数get_updated_exec_date定义如下:

def get_updated_exec_date(execution_date, delta=-1):
  date_next_month=execution_date + relativedelta(months=1)
  start_next_month=date_next_month.replace(day=1)
  return start_next_month + timedelta(days=delta)

这在 Airflow 1.10.6 上完美运行。我们正在迁移到 1.10.15,在这个环境中,完全相同的代码在 return 语句和 TypeError: unsupported type for timedelta days component: dict

上崩溃

有人能帮忙吗?

Airflow 1.10.11 添加 PR 将上下文添加到 ExternalTaskSensor 中的 execution_date_fn。所以在您的代码中发生的是 context 被传递给 delta 并覆盖默认值 -1。由于 context 是 dict 类型,您遇到了错误。

修复方法是将函数签名替换为:

def get_updated_exec_date(execution_date, context)

因为 delta 总是 -1 只需将它移到函数中即可。