Airflow 不允许我将 try_number 宏转换为整数
Airflow won't let me convert try_number macros to an integer
我的问题很简单,但是还没有真正提供真正的解决方案。我尝试构建的 DAG 将有一个关键的环境变量,该变量可能会根据它所经历的重试次数而改变。比如重试3次后我们将下面代码中提到的环境变量的值"critical_value"
切换成值
所以当我推送下面的代码时,我得到以下损坏的 DAG 错误:
Broken DAG: [/usr/local/airflow/dags/sync/dags/example_dag.py] invalid literal for int() with base 10: '{{ task_instance.try_number }}'
这是我们在尝试将字符串转换为 int 时遇到的错误。在这种情况下,在 运行 时 '{{ task_instance.try_number }}'
将取 运行 的尝试次数的适当值。我也试过删除 int() 转换器,但在比较整数和字符串时出现错误。
任何帮助将不胜感激!提前谢谢你。
# Required DAG arguments
default_args = {
'owner':airflow,
"depend_on_past":False,
'start_date':datetime(2021, 6, 4),
"retries": 3
}
# Instantiate DAG
with DAG(
example_dag,
default_args=default_args,
schedule_interval=@once,
) as dag:
# Run python script of loader job
full_script_task = KubernetesPodOperator(
name=example_task,
task_id=example_task_id,
namespace=Variable.get("kubernetes_namespace"),
image=some_image,
configmaps=CONFIG_MAPS,
secrets=SECRETS,
volumes=[logging_volume],
volume_mounts=[logging_volume_mount],
env_vars={
"critical_value": "value_if_true" if int('{{ task_instance.try_number }}') >= 3 else "value_if_false"
}
)
正如我们通过您的测试确认的那样,您不能在参数内的 if
子句中使用 jinja 模板化变量。您尝试了这个并且变量被正确读取:
env_vars={
"critical_value": "{{ task_instance.try_number }}"
}
你可以做的是在运算符中处理 try_number
而不是作用于 critical_value
,只需使用 try_number
.
Aiflow 可以处理 if
表达式并在 Jinja 中正确输入评估,但整个值需要在模板语法中。试试这个(根据 KubernetesPodOperator
的文档,env_vars
需要是 list
,否则该值将是一个字符串化列表):
env_vars=[
{
"critical_value": "{{ 'value_if_true' if task_instance.try_number >= 3 else 'value_if_false' }}"
}
],
我的问题很简单,但是还没有真正提供真正的解决方案。我尝试构建的 DAG 将有一个关键的环境变量,该变量可能会根据它所经历的重试次数而改变。比如重试3次后我们将下面代码中提到的环境变量的值"critical_value"
切换成值
所以当我推送下面的代码时,我得到以下损坏的 DAG 错误:
Broken DAG: [/usr/local/airflow/dags/sync/dags/example_dag.py] invalid literal for int() with base 10: '{{ task_instance.try_number }}'
这是我们在尝试将字符串转换为 int 时遇到的错误。在这种情况下,在 运行 时 '{{ task_instance.try_number }}'
将取 运行 的尝试次数的适当值。我也试过删除 int() 转换器,但在比较整数和字符串时出现错误。
任何帮助将不胜感激!提前谢谢你。
# Required DAG arguments
default_args = {
'owner':airflow,
"depend_on_past":False,
'start_date':datetime(2021, 6, 4),
"retries": 3
}
# Instantiate DAG
with DAG(
example_dag,
default_args=default_args,
schedule_interval=@once,
) as dag:
# Run python script of loader job
full_script_task = KubernetesPodOperator(
name=example_task,
task_id=example_task_id,
namespace=Variable.get("kubernetes_namespace"),
image=some_image,
configmaps=CONFIG_MAPS,
secrets=SECRETS,
volumes=[logging_volume],
volume_mounts=[logging_volume_mount],
env_vars={
"critical_value": "value_if_true" if int('{{ task_instance.try_number }}') >= 3 else "value_if_false"
}
)
正如我们通过您的测试确认的那样,您不能在参数内的 if
子句中使用 jinja 模板化变量。您尝试了这个并且变量被正确读取:
env_vars={
"critical_value": "{{ task_instance.try_number }}"
}
你可以做的是在运算符中处理 try_number
而不是作用于 critical_value
,只需使用 try_number
.
Aiflow 可以处理 if
表达式并在 Jinja 中正确输入评估,但整个值需要在模板语法中。试试这个(根据 KubernetesPodOperator
的文档,env_vars
需要是 list
,否则该值将是一个字符串化列表):
env_vars=[
{
"critical_value": "{{ 'value_if_true' if task_instance.try_number >= 3 else 'value_if_false' }}"
}
],