当子文件夹名称相同时,Airflow Packaged Dags(压缩)发生冲突
Airflow Packaged Dags (zipped) clash when subfolders have same name
我们正在建立一个 Airflow 框架,多个数据科学家团队可以在其中编排他们的数据处理管道。我们开发了一个 Python 代码库来帮助他们实现 DAG,其中包括各种包和模块中的函数和 类(运算符子 类)。
每个团队都会将自己的 DAG 与函数和 类 一起打包在 ZIP 文件中。例如第一个 ZIP 文件将包含
ZIP1:
main_dag_teamA.py
subfolder1: package1-with-generic-functions + init.py
subfolder2: package2-with-generic-operators + init.py
另一个 ZIP 文件将包含
ZIP2:
main_dag_teamB.py
subfolder1: package1-with-generic-functions + init.py
subfolder2: package2-with-generic-operators + init.py
请注意,在两个 ZIP 文件中,子文件夹 1 和子文件夹 2 通常完全相同,这意味着具有相同功能和 类 的完全相同的文件。
但随着时间的推移,当新版本的包可用时,包内容将开始在 DAG 包中出现偏差。
通过此设置,我们遇到了以下问题:当 packages/subfolders 的内容开始在 ZIP 之间出现偏差时,Airflow 似乎无法很好地处理同名包。
因为当我 运行 "airflow list_dags" 它显示如下错误:
File "/data/share/airflow/dags/program1/program1.zip/program1.py", line 1, in > from subfolder1.functions1 import function1
ImportError: No module named 'subfolder1.functions1'
问题可以通过以下代码重现,其中两个小 DAG 与包 my_functions 一起在它们的 ZIP 文件中,它们具有相同的名称,但内容不同。
DAG 包 ZIP 1:
program1.py
from my_functions.functions1 import function1
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
def do_it():
print('program1')
dag = DAG(
'program1',
schedule_interval=None,
catchup=False,
start_date=datetime(2019, 6, 23)
)
hello_operator = PythonOperator(task_id='program1_task1', python_callable=do_it, dag=dag)
my_functions/functions1.py:
def function1():
print('function1')
DAG 压缩包 ZIP 2:
program2.py:
from my_functions.functions2 import function2
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
def do_it():
print('program1')
dag = DAG(
'program1',
schedule_interval=None,
catchup=False,
start_date=datetime(2019, 6, 23)
)
hello_operator = PythonOperator(task_id='program2_task2', python_callable=do_it, dag=dag)
my_functions/functions2.py:
def function2():
print('function2')
使用这两个 ZIP 文件,当我 运行 "airflow list_dags" 它显示错误:
文件“/data/share/airflow/dags/program1/program1.zip/program1.py”,第 1 行,在
来自 subfolder1.functions1 import function1 ImportError:没有名为 'subfolder1.functions1'
的模块
当ZIP中的子文件夹内容相同时,不会出现错误。
我的问题:如何防止 ZIP 中的子文件夹发生冲突?我真的很想拥有完全独立于代码的 DAG,以及它们自己的包版本。
通过在
之前的 DAG(program1.py 和 program2.py)顶部执行以下操作来解决
from my_functions.functions1 import function1
和
from my_functions.functions2 import function2
代码:
import sys
# Cleanup up the already imported function module
cleanup_mods = []
for mod in sys.modules:
if mod.startswith("function"):
cleanup_mods.append(mod)
for mod in cleanup_mods:
del sys.modules[mod]
这确保每次解析 DAG 时,导入的库都会被清理。
我们正在建立一个 Airflow 框架,多个数据科学家团队可以在其中编排他们的数据处理管道。我们开发了一个 Python 代码库来帮助他们实现 DAG,其中包括各种包和模块中的函数和 类(运算符子 类)。
每个团队都会将自己的 DAG 与函数和 类 一起打包在 ZIP 文件中。例如第一个 ZIP 文件将包含
ZIP1:
main_dag_teamA.py
subfolder1: package1-with-generic-functions + init.py
subfolder2: package2-with-generic-operators + init.py
另一个 ZIP 文件将包含
ZIP2:
main_dag_teamB.py
subfolder1: package1-with-generic-functions + init.py
subfolder2: package2-with-generic-operators + init.py
请注意,在两个 ZIP 文件中,子文件夹 1 和子文件夹 2 通常完全相同,这意味着具有相同功能和 类 的完全相同的文件。 但随着时间的推移,当新版本的包可用时,包内容将开始在 DAG 包中出现偏差。
通过此设置,我们遇到了以下问题:当 packages/subfolders 的内容开始在 ZIP 之间出现偏差时,Airflow 似乎无法很好地处理同名包。 因为当我 运行 "airflow list_dags" 它显示如下错误:
File "/data/share/airflow/dags/program1/program1.zip/program1.py", line 1, in > from subfolder1.functions1 import function1 ImportError: No module named 'subfolder1.functions1'
问题可以通过以下代码重现,其中两个小 DAG 与包 my_functions 一起在它们的 ZIP 文件中,它们具有相同的名称,但内容不同。
DAG 包 ZIP 1:
program1.py
from my_functions.functions1 import function1
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
def do_it():
print('program1')
dag = DAG(
'program1',
schedule_interval=None,
catchup=False,
start_date=datetime(2019, 6, 23)
)
hello_operator = PythonOperator(task_id='program1_task1', python_callable=do_it, dag=dag)
my_functions/functions1.py:
def function1():
print('function1')
DAG 压缩包 ZIP 2:
program2.py:
from my_functions.functions2 import function2
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
def do_it():
print('program1')
dag = DAG(
'program1',
schedule_interval=None,
catchup=False,
start_date=datetime(2019, 6, 23)
)
hello_operator = PythonOperator(task_id='program2_task2', python_callable=do_it, dag=dag)
my_functions/functions2.py:
def function2():
print('function2')
使用这两个 ZIP 文件,当我 运行 "airflow list_dags" 它显示错误:
文件“/data/share/airflow/dags/program1/program1.zip/program1.py”,第 1 行,在 来自 subfolder1.functions1 import function1 ImportError:没有名为 'subfolder1.functions1'
的模块当ZIP中的子文件夹内容相同时,不会出现错误。
我的问题:如何防止 ZIP 中的子文件夹发生冲突?我真的很想拥有完全独立于代码的 DAG,以及它们自己的包版本。
通过在
之前的 DAG(program1.py 和 program2.py)顶部执行以下操作来解决from my_functions.functions1 import function1
和
from my_functions.functions2 import function2
代码:
import sys
# Cleanup up the already imported function module
cleanup_mods = []
for mod in sys.modules:
if mod.startswith("function"):
cleanup_mods.append(mod)
for mod in cleanup_mods:
del sys.modules[mod]
这确保每次解析 DAG 时,导入的库都会被清理。