使用变量创建动态任务期间出现 Airflow Broken DAG 错误

Airflow Broken DAG error during dynamic task creation with variables

我正在尝试根据气流变量创建动态任务:

我的代码是:

default_args = {
    'start_date': datetime(year=2021, month=6, day=20),
    'provide_context': True
}

with DAG(
        dag_id='Target_DIF',
        default_args=default_args,
        schedule_interval='@once',
        description='ETL pipeline for processing users'
) as dag:


    iterable_list = Variable.get("num_table")
    for index, table in enumerate(iterable_list):
        read_src1 = PythonOperator(
            task_id=f'read_src_{table}'
            python_callable=read_src,
        )
        upload_file_to_directory_bulk1 = PythonOperator(
            task_id=f'upload_file_to_directory_bulk_{table}',
            python_callable=upload_file_to_directory_bulk
        )
        write_Snowflake1 = PythonOperator(
            task_id=f'write_Snowflake_{table}',
            python_callable=write_Snowflake
        )

        # TaskGroup level dependencies

        # DAG level dependencies
        start >> read_src1 >> upload_file_to_directory_bulk1 >> write_Snowflake1 >> end

我遇到以下错误:

Broken DAG: [/home/dif/airflow/dags/target_dag.py] Traceback (most recent call last):
airflow.exceptions.AirflowException: The key (read_src_[) has to be made of alphanumeric characters, dashes, dots and underscores exclusively

代码在代码更改后工作完美:

#iterable_list = Variable.get("num_table")
iterable_list = ['inventories', 'products']

Start 和 End 是虚拟运算符。 Airflow 变量的数据如图所示。

我预期的动态工作流程:

我可以使用列表实现上述流程,但不能使用 Airflow 变量。

感谢任何找到错误原因的线索。谢谢。

问题是默认情况下,Airflow 将变量读取为 str。试试这个:

iterable_list = Variable.get("num_table", deserialize_json=True)

Variable.get("num_table") returns 字符串。 因此,您的循环实际上是在遍历 ['inventories, 'ptoducts'] 的字符,这就是为什么在循环的第一次迭代中 task_id=f'read_src_{table}'read_src_[[ 不是 [ 的有效字符=16=].

您应该将字符串转换为列表。

将您的变量保存为:"inventories,ptoducts" 然后您可以:

iterable_string = Variable.get("num_table")
iterable_list = iterable_string.split(",")
for index, table in enumerate(iterable_list):

您应该注意,使用 Variable.get("num_table") 作为顶级代码是一种非常糟糕的做法!

我能够通过以下修改得出解决方案:

import ast
...
...
    iterable_string = Variable.get("num_table",default_var="[]")
    iterable_list = ast.literal_eval(iterable_string)
...

气流变量存储为字符串。 所以我的数据存储为“[tab1,tab2]”。 所以我使用 literal_eval 将字符串转换回列表。 我还添加了一个空列表作为默认值,这样如果变量 num_table 中没有值,我将不会进一步处理。