当子文件夹名称相同时,Airflow Packaged Dags(压缩)发生冲突

Airflow Packaged Dags (zipped) clash when subfolders have same name

我们正在建立一个 Airflow 框架,多个数据科学家团队可以在其中编排他们的数据处理管道。我们开发了一个 Python 代码库来帮助他们实现 DAG,其中包括各种包和模块中的函数和 类(运算符子 类)。

每个团队都会将自己的 DAG 与函数和 类 一起打包在 ZIP 文件中。例如第一个 ZIP 文件将包含

ZIP1:

main_dag_teamA.py

subfolder1: package1-with-generic-functions + init.py

subfolder2: package2-with-generic-operators + init.py

另一个 ZIP 文件将包含

ZIP2:

main_dag_teamB.py

subfolder1: package1-with-generic-functions + init.py

subfolder2: package2-with-generic-operators + init.py

请注意,在两个 ZIP 文件中,子文件夹 1 和子文件夹 2 通常完全相同,这意味着具有相同功能和 类 的完全相同的文件。 但随着时间的推移,当新版本的包可用时,包内容将开始在 DAG 包中出现偏差。

通过此设置,我们遇到了以下问题:当 packages/subfolders 的内容开始在 ZIP 之间出现偏差时,Airflow 似乎无法很好地处理同名包。 因为当我 运行 "airflow list_dags" 它显示如下错误:

File "/data/share/airflow/dags/program1/program1.zip/program1.py", line 1, in > from subfolder1.functions1 import function1 ImportError: No module named 'subfolder1.functions1'

问题可以通过以下代码重现,其中两个小 DAG 与包 my_functions 一起在它们的 ZIP 文件中,它们具有相同的名称,但内容不同。

DAG 包 ZIP 1:

program1.py

from my_functions.functions1 import function1

from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator


def do_it():
    print('program1')

dag = DAG(
    'program1',
    schedule_interval=None,
    catchup=False,
    start_date=datetime(2019, 6, 23)
)

hello_operator = PythonOperator(task_id='program1_task1', python_callable=do_it, dag=dag)

my_functions/functions1.py:

def function1():
    print('function1')

DAG 压缩包 ZIP 2:

program2.py:

from my_functions.functions2 import function2

from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator


def do_it():
    print('program1')

dag = DAG(
    'program1',
    schedule_interval=None,
    catchup=False,
    start_date=datetime(2019, 6, 23)
)

hello_operator = PythonOperator(task_id='program2_task2', python_callable=do_it, dag=dag)

my_functions/functions2.py:

def function2():
    print('function2')

使用这两个 ZIP 文件,当我 运行 "airflow list_dags" 它显示错误:

文件“/data/share/airflow/dags/program1/program1.zip/program1.py”,第 1 行,在 来自 subfolder1.functions1 import function1 ImportError:没有名为 'subfolder1.functions1'

的模块

当ZIP中的子文件夹内容相同时,不会出现错误。

我的问题:如何防止 ZIP 中的子文件夹发生冲突?我真的很想拥有完全独立于代码的 DAG,以及它们自己的包版本。

通过在

之前的 DAG(program1.py 和 program2.py)顶部执行以下操作来解决
from my_functions.functions1 import function1

from my_functions.functions2 import function2

代码:

import sys

# Cleanup up the already imported function module
cleanup_mods = []
for mod in sys.modules:
    if mod.startswith("function"):
        cleanup_mods.append(mod)
for mod in cleanup_mods:
    del sys.modules[mod]

这确保每次解析 DAG 时,导入的库都会被清理。