(Django)气流中的 ORM - 可能吗?
(Django) ORM in airflow - is it possible?
如何在 Airflow 任务中使用 Django 模型?
根据 Airflow 官方文档,Airflow 提供了用于与数据库交互的钩子(如 MySqlHook / PostgresHook 等),这些钩子稍后可以在 Operators 中用于行查询执行。附上核心代码片段:
复制自https://airflow.apache.org/_modules/mysql_hook.html
class MySqlHook(DbApiHook):
conn_name_attr = 'mysql_conn_id'
default_conn_name = 'mysql_default'
supports_autocommit = True
def get_conn(self):
"""
Returns a mysql connection object
"""
conn = self.get_connection(self.mysql_conn_id)
conn_config = {
"user": conn.login,
"passwd": conn.password or ''
}
conn_config["host"] = conn.host or 'localhost'
conn_config["db"] = conn.schema or ''
conn = MySQLdb.connect(**conn_config)
return conn
复制自https://airflow.apache.org/_modules/mysql_operator.html
class MySqlOperator(BaseOperator):
@apply_defaults
def __init__(
self, sql, mysql_conn_id='mysql_default', parameters=None,
autocommit=False, *args, **kwargs):
super(MySqlOperator, self).__init__(*args, **kwargs)
self.mysql_conn_id = mysql_conn_id
self.sql = sql
self.autocommit = autocommit
self.parameters = parameters
def execute(self, context):
logging.info('Executing: ' + str(self.sql))
hook = MySqlHook(mysql_conn_id=self.mysql_conn_id)
hook.run(
self.sql,
autocommit=self.autocommit,
parameters=self.parameters)
正如我们所见,Hook 封装了连接配置,而 Operator 提供了执行自定义查询的能力。
问题:
使用不同的 ORM 而不是原始的 SQL 来获取和处理数据库对象非常方便,原因如下:
- 在简单的情况下,ORM 可能是更方便的解决方案,请参阅 ORM definitions。
- 假设已经有像 Django 这样具有定义模型及其方法的系统。每次这些模型的模式发生变化时,airflow raw SQL 查询都需要重写。 ORM 为使用此类模型提供了统一的界面。
出于某种原因,在钩子和运算符方面,Airflow 任务中没有使用 ORM 的示例。根据Using Django database layer outside of Django? question, it's needed to set up a connection configuration to the database, and then straight-forwardly execute queires in ORM, but doing that outside appropriate hooks / operators breaks Airflow principles。这就像用 "python work_with_django_models.py" 命令调用 BashOperator。
最后,我们想要这个:
那么在这种情况下最好的做法是什么?我们是否共享 Django ORM / 其他 ORM 的钩子 / 运算符?为了使以下代码真实(视为伪代码!):
import os
import django
os.environ.setdefault(
"DJANGO_SETTINGS_MODULE",
"myapp.settings"
)
django.setup()
from your_app import models
def get_and_modify_models(ds, **kwargs):
all_objects = models.MyModel.objects.filter(my_str_field = 'abc')
all_objects[15].my_int_field = 25
all_objects[15].save()
return list(all_objects)
django_op = DjangoOperator(task_id='get_and_modify_models', owner='airflow')
而不是在原始 SQL 中实现此功能。
我认为这是一个非常重要的话题,因为在这种情况下,所有基于 ORM 的框架和流程都无法深入研究 Airflow。
提前致谢!
我同意我们应该继续讨论,因为访问 Django ORM 可以显着降低解决方案的复杂性。
我的方法是 1) 创建一个 DjangoOperator
import os, sys
from airflow.models import BaseOperator
def setup_django_for_airflow():
# Add Django project root to path
sys.path.append('./project_root/')
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myapp.settings")
import django
django.setup()
class DjangoOperator(BaseOperator):
def pre_execute(self, *args, **kwargs):
setup_django_for_airflow()
和 2) 为逻辑/运算符扩展 DjangoOperator,访问 ORM 会受益
from .base import DjangoOperator
class DjangoExampleOperator(DjangoOperator):
def execute(self, context):
from myApp.models import model
model.objects.get_or_create()
使用此策略,您可以区分使用 Raw SQL / ORM 的运算符。另请注意,对于 Django 运算符,所有 Django 模型导入都需要在执行上下文中,如上所示。
如何在 Airflow 任务中使用 Django 模型?
根据 Airflow 官方文档,Airflow 提供了用于与数据库交互的钩子(如 MySqlHook / PostgresHook 等),这些钩子稍后可以在 Operators 中用于行查询执行。附上核心代码片段:
复制自https://airflow.apache.org/_modules/mysql_hook.html
class MySqlHook(DbApiHook):
conn_name_attr = 'mysql_conn_id'
default_conn_name = 'mysql_default'
supports_autocommit = True
def get_conn(self):
"""
Returns a mysql connection object
"""
conn = self.get_connection(self.mysql_conn_id)
conn_config = {
"user": conn.login,
"passwd": conn.password or ''
}
conn_config["host"] = conn.host or 'localhost'
conn_config["db"] = conn.schema or ''
conn = MySQLdb.connect(**conn_config)
return conn
复制自https://airflow.apache.org/_modules/mysql_operator.html
class MySqlOperator(BaseOperator):
@apply_defaults
def __init__(
self, sql, mysql_conn_id='mysql_default', parameters=None,
autocommit=False, *args, **kwargs):
super(MySqlOperator, self).__init__(*args, **kwargs)
self.mysql_conn_id = mysql_conn_id
self.sql = sql
self.autocommit = autocommit
self.parameters = parameters
def execute(self, context):
logging.info('Executing: ' + str(self.sql))
hook = MySqlHook(mysql_conn_id=self.mysql_conn_id)
hook.run(
self.sql,
autocommit=self.autocommit,
parameters=self.parameters)
正如我们所见,Hook 封装了连接配置,而 Operator 提供了执行自定义查询的能力。
问题:
使用不同的 ORM 而不是原始的 SQL 来获取和处理数据库对象非常方便,原因如下:
- 在简单的情况下,ORM 可能是更方便的解决方案,请参阅 ORM definitions。
- 假设已经有像 Django 这样具有定义模型及其方法的系统。每次这些模型的模式发生变化时,airflow raw SQL 查询都需要重写。 ORM 为使用此类模型提供了统一的界面。
出于某种原因,在钩子和运算符方面,Airflow 任务中没有使用 ORM 的示例。根据Using Django database layer outside of Django? question, it's needed to set up a connection configuration to the database, and then straight-forwardly execute queires in ORM, but doing that outside appropriate hooks / operators breaks Airflow principles。这就像用 "python work_with_django_models.py" 命令调用 BashOperator。
最后,我们想要这个:
那么在这种情况下最好的做法是什么?我们是否共享 Django ORM / 其他 ORM 的钩子 / 运算符?为了使以下代码真实(视为伪代码!):
import os
import django
os.environ.setdefault(
"DJANGO_SETTINGS_MODULE",
"myapp.settings"
)
django.setup()
from your_app import models
def get_and_modify_models(ds, **kwargs):
all_objects = models.MyModel.objects.filter(my_str_field = 'abc')
all_objects[15].my_int_field = 25
all_objects[15].save()
return list(all_objects)
django_op = DjangoOperator(task_id='get_and_modify_models', owner='airflow')
而不是在原始 SQL 中实现此功能。
我认为这是一个非常重要的话题,因为在这种情况下,所有基于 ORM 的框架和流程都无法深入研究 Airflow。
提前致谢!
我同意我们应该继续讨论,因为访问 Django ORM 可以显着降低解决方案的复杂性。
我的方法是 1) 创建一个 DjangoOperator
import os, sys
from airflow.models import BaseOperator
def setup_django_for_airflow():
# Add Django project root to path
sys.path.append('./project_root/')
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myapp.settings")
import django
django.setup()
class DjangoOperator(BaseOperator):
def pre_execute(self, *args, **kwargs):
setup_django_for_airflow()
和 2) 为逻辑/运算符扩展 DjangoOperator,访问 ORM 会受益
from .base import DjangoOperator
class DjangoExampleOperator(DjangoOperator):
def execute(self, context):
from myApp.models import model
model.objects.get_or_create()
使用此策略,您可以区分使用 Raw SQL / ORM 的运算符。另请注意,对于 Django 运算符,所有 Django 模型导入都需要在执行上下文中,如上所示。