(Django)气流中的 ORM - 可能吗?

(Django) ORM in airflow - is it possible?

如何在 Airflow 任务中使用 Django 模型?

根据 Airflow 官方文档,Airflow 提供了用于与数据库交互的钩子(如 MySqlHook / PostgresHook 等),这些钩子稍后可以在 Operators 中用于行查询执行。附上核心代码片段:

复制自https://airflow.apache.org/_modules/mysql_hook.html

class MySqlHook(DbApiHook):
    conn_name_attr = 'mysql_conn_id'
    default_conn_name = 'mysql_default'
    supports_autocommit = True

    def get_conn(self):
        """
        Returns a mysql connection object
        """
        conn = self.get_connection(self.mysql_conn_id)
        conn_config = {
            "user": conn.login,
            "passwd": conn.password or ''
        }
        conn_config["host"] = conn.host or 'localhost'
        conn_config["db"] = conn.schema or ''
        conn = MySQLdb.connect(**conn_config)
        return conn

复制自https://airflow.apache.org/_modules/mysql_operator.html

class MySqlOperator(BaseOperator):
    @apply_defaults
    def __init__(
            self, sql, mysql_conn_id='mysql_default', parameters=None,
            autocommit=False, *args, **kwargs):
        super(MySqlOperator, self).__init__(*args, **kwargs)
        self.mysql_conn_id = mysql_conn_id
        self.sql = sql
        self.autocommit = autocommit
        self.parameters = parameters

    def execute(self, context):
        logging.info('Executing: ' + str(self.sql))
        hook = MySqlHook(mysql_conn_id=self.mysql_conn_id)
        hook.run(
            self.sql,
            autocommit=self.autocommit,
            parameters=self.parameters)

正如我们所见,Hook 封装了连接配置,而 Operator 提供了执行自定义查询的能力。

问题:

使用不同的 ORM 而不是原始的 SQL 来获取和处理数据库对象非常方便,原因如下:

  1. 在简单的情况下,ORM 可能是更方便的解决方案,请参阅 ORM definitions
  2. 假设已经有像 Django 这样具有定义模型及其方法的系统。每次这些模型的模式发生变化时,airflow raw SQL 查询都需要重写。 ORM 为使用此类模型提供了统一的界面。

出于某种原因,在钩子和运算符方面,Airflow 任务中没有使用 ORM 的示例。根据Using Django database layer outside of Django? question, it's needed to set up a connection configuration to the database, and then straight-forwardly execute queires in ORM, but doing that outside appropriate hooks / operators breaks Airflow principles。这就像用 "python work_with_django_models.py" 命令调用 BashOperator。

最后,我们想要这个:

那么在这种情况下最好的做法是什么?我们是否共享 Django ORM / 其他 ORM 的钩子 / 运算符?为了使以下代码真实(视为伪代码!):

import os
import django
os.environ.setdefault(
    "DJANGO_SETTINGS_MODULE",
    "myapp.settings"
)
django.setup()
from your_app import models

def get_and_modify_models(ds, **kwargs):
    all_objects = models.MyModel.objects.filter(my_str_field = 'abc')
    all_objects[15].my_int_field = 25
    all_objects[15].save()
    return list(all_objects)

django_op = DjangoOperator(task_id='get_and_modify_models', owner='airflow')

而不是在原始 SQL 中实现此功能。

我认为这是一个非常重要的话题,因为在这种情况下,所有基于 ORM 的框架和流程都无法深入研究 Airflow。

提前致谢!

我同意我们应该继续讨论,因为访问 Django ORM 可以显着降低解决方案的复杂性。

我的方法是 1) 创建一个 DjangoOperator

import os, sys

from airflow.models import BaseOperator


def setup_django_for_airflow():
    # Add Django project root to path
    sys.path.append('./project_root/')

    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myapp.settings")

    import django
    django.setup()


class DjangoOperator(BaseOperator):

    def pre_execute(self, *args, **kwargs):
        setup_django_for_airflow()

和 2) 为逻辑/运算符扩展 DjangoOperator,访问 ORM 会受益

from .base import DjangoOperator


class DjangoExampleOperator(DjangoOperator):

    def execute(self, context):
        from myApp.models import model
        model.objects.get_or_create()

使用此策略,您可以区分使用 Raw SQL / ORM 的运算符。另请注意,对于 Django 运算符,所有 Django 模型导入都需要在执行上下文中,如上所示。