使用变量创建动态任务期间出现 Airflow Broken DAG 错误
Airflow Broken DAG error during dynamic task creation with variables
我正在尝试根据气流变量创建动态任务:
我的代码是:
default_args = {
'start_date': datetime(year=2021, month=6, day=20),
'provide_context': True
}
with DAG(
dag_id='Target_DIF',
default_args=default_args,
schedule_interval='@once',
description='ETL pipeline for processing users'
) as dag:
iterable_list = Variable.get("num_table")
for index, table in enumerate(iterable_list):
read_src1 = PythonOperator(
task_id=f'read_src_{table}'
python_callable=read_src,
)
upload_file_to_directory_bulk1 = PythonOperator(
task_id=f'upload_file_to_directory_bulk_{table}',
python_callable=upload_file_to_directory_bulk
)
write_Snowflake1 = PythonOperator(
task_id=f'write_Snowflake_{table}',
python_callable=write_Snowflake
)
# TaskGroup level dependencies
# DAG level dependencies
start >> read_src1 >> upload_file_to_directory_bulk1 >> write_Snowflake1 >> end
我遇到以下错误:
Broken DAG: [/home/dif/airflow/dags/target_dag.py] Traceback (most recent call last):
airflow.exceptions.AirflowException: The key (read_src_[) has to be made of alphanumeric characters, dashes, dots and underscores exclusively
代码在代码更改后工作完美:
#iterable_list = Variable.get("num_table")
iterable_list = ['inventories', 'products']
Start 和 End 是虚拟运算符。
Airflow 变量的数据如图所示。
我预期的动态工作流程:
我可以使用列表实现上述流程,但不能使用 Airflow 变量。
感谢任何找到错误原因的线索。谢谢。
问题是默认情况下,Airflow 将变量读取为 str
。试试这个:
iterable_list = Variable.get("num_table", deserialize_json=True)
Variable.get("num_table")
returns 字符串。
因此,您的循环实际上是在遍历 ['inventories, 'ptoducts']
的字符,这就是为什么在循环的第一次迭代中 task_id=f'read_src_{table}'
是 read_src_[
而 [
不是 [ 的有效字符=16=].
您应该将字符串转换为列表。
将您的变量保存为:"inventories,ptoducts"
然后您可以:
iterable_string = Variable.get("num_table")
iterable_list = iterable_string.split(",")
for index, table in enumerate(iterable_list):
您应该注意,使用 Variable.get("num_table")
作为顶级代码是一种非常糟糕的做法!
我能够通过以下修改得出解决方案:
import ast
...
...
iterable_string = Variable.get("num_table",default_var="[]")
iterable_list = ast.literal_eval(iterable_string)
...
气流变量存储为字符串。
所以我的数据存储为“[tab1,tab2]”。
所以我使用 literal_eval 将字符串转换回列表。
我还添加了一个空列表作为默认值,这样如果变量 num_table 中没有值,我将不会进一步处理。
我正在尝试根据气流变量创建动态任务:
我的代码是:
default_args = {
'start_date': datetime(year=2021, month=6, day=20),
'provide_context': True
}
with DAG(
dag_id='Target_DIF',
default_args=default_args,
schedule_interval='@once',
description='ETL pipeline for processing users'
) as dag:
iterable_list = Variable.get("num_table")
for index, table in enumerate(iterable_list):
read_src1 = PythonOperator(
task_id=f'read_src_{table}'
python_callable=read_src,
)
upload_file_to_directory_bulk1 = PythonOperator(
task_id=f'upload_file_to_directory_bulk_{table}',
python_callable=upload_file_to_directory_bulk
)
write_Snowflake1 = PythonOperator(
task_id=f'write_Snowflake_{table}',
python_callable=write_Snowflake
)
# TaskGroup level dependencies
# DAG level dependencies
start >> read_src1 >> upload_file_to_directory_bulk1 >> write_Snowflake1 >> end
我遇到以下错误:
Broken DAG: [/home/dif/airflow/dags/target_dag.py] Traceback (most recent call last):
airflow.exceptions.AirflowException: The key (read_src_[) has to be made of alphanumeric characters, dashes, dots and underscores exclusively
代码在代码更改后工作完美:
#iterable_list = Variable.get("num_table")
iterable_list = ['inventories', 'products']
Start 和 End 是虚拟运算符。 Airflow 变量的数据如图所示。
我预期的动态工作流程:
我可以使用列表实现上述流程,但不能使用 Airflow 变量。
感谢任何找到错误原因的线索。谢谢。
问题是默认情况下,Airflow 将变量读取为 str
。试试这个:
iterable_list = Variable.get("num_table", deserialize_json=True)
Variable.get("num_table")
returns 字符串。
因此,您的循环实际上是在遍历 ['inventories, 'ptoducts']
的字符,这就是为什么在循环的第一次迭代中 task_id=f'read_src_{table}'
是 read_src_[
而 [
不是 [ 的有效字符=16=].
您应该将字符串转换为列表。
将您的变量保存为:"inventories,ptoducts"
然后您可以:
iterable_string = Variable.get("num_table")
iterable_list = iterable_string.split(",")
for index, table in enumerate(iterable_list):
您应该注意,使用 Variable.get("num_table")
作为顶级代码是一种非常糟糕的做法!
我能够通过以下修改得出解决方案:
import ast
...
...
iterable_string = Variable.get("num_table",default_var="[]")
iterable_list = ast.literal_eval(iterable_string)
...
气流变量存储为字符串。 所以我的数据存储为“[tab1,tab2]”。 所以我使用 literal_eval 将字符串转换回列表。 我还添加了一个空列表作为默认值,这样如果变量 num_table 中没有值,我将不会进一步处理。