Airflow 不允许我将 try_number 宏转换为整数

Airflow won't let me convert try_number macros to an integer

我的问题很简单,但是还没有真正提供真正的解决方案。我尝试构建的 DAG 将有一个关键的环境变量,该变量可能会根据它所经历的重试次数而改变。比如重试3次后我们将下面代码中提到的环境变量的值"critical_value"切换成值

所以当我推送下面的代码时,我得到以下损坏的 DAG 错误:

Broken DAG: [/usr/local/airflow/dags/sync/dags/example_dag.py] invalid literal for int() with base 10: '{{ task_instance.try_number }}'

这是我们在尝试将字符串转换为 int 时遇到的错误。在这种情况下,在 运行 时 '{{ task_instance.try_number }}' 将取 运行 的尝试次数的适当值。我也试过删除 int() 转换器,但在比较整数和字符串时出现错误。

任何帮助将不胜感激!提前谢谢你。

# Required DAG arguments
default_args = {
    'owner':airflow,
    "depend_on_past":False,
    'start_date':datetime(2021, 6, 4),

    "retries": 3
}

# Instantiate DAG
with DAG(
    example_dag,
    default_args=default_args,
    schedule_interval=@once,
    ) as dag:
    
    # Run python script of loader job
    full_script_task = KubernetesPodOperator(
        name=example_task,
        task_id=example_task_id,
        namespace=Variable.get("kubernetes_namespace"),
        image=some_image,
        configmaps=CONFIG_MAPS,
        secrets=SECRETS,
        volumes=[logging_volume],
        volume_mounts=[logging_volume_mount],
        env_vars={
            "critical_value": "value_if_true" if int('{{ task_instance.try_number }}') >= 3 else "value_if_false"
        }
)

正如我们通过您的测试确认的那样,您不能在参数内的 if 子句中使用 jinja 模板化变量。您尝试了这个并且变量被正确读取:

env_vars={
    "critical_value": "{{ task_instance.try_number }}"
}

你可以做的是在运算符中处理 try_number 而不是作用于 critical_value,只需使用 try_number.

Aiflow 可以处理 if 表达式并在 Jinja 中正确输入评估,但整个值需要在模板语法中。试试这个(根据 KubernetesPodOperator 的文档,env_vars 需要是 list,否则该值将是一个字符串化列表):

env_vars=[
            {
                "critical_value": "{{ 'value_if_true' if task_instance.try_number >= 3 else 'value_if_false' }}"
            }
        ],