Django 从原始查询创建管理列表

Django create admin list from raw query

我愿意在管理员中创建查询集列表模板

查询集是通过 cursor.execute() 获得的,因此可能需要一个假模型。

基本上我只想利用 Django admin list 提供的分页和过滤功能。

这是我的代码的相关部分

models.py

class Query(object):

   def __init__(self, sql):

       self.sql = sql


   def execute_query(self):

        cursor = connection.cursor()
        start_time = time()

        try:
            cursor.execute(self.sql)
        except DatabaseError as e:
            cursor.close()
            raise e

        return cursor, ((time() - start_time) * 1000)

admin.py

 class QueryAdmin(admin.ModelAdmin):
 ....


admin.site.register(Query, QueryAdmin)

如果您只是尝试使用 django 管理功能来利用它的分页和过滤功能,您会发现自己花费的时间比使用 django pagination 时要多得多。首先,您会发现分页和过滤 将无法使用 和 cursor.execute

如果您确实需要某种管理功能,请继续阅读。

在 django admin 中使用自定义查询集的首选方法是覆盖 admin class.

中的 get_queryset 方法

The get_queryset method on a ModelAdmin returns a QuerySet of all model instances that can be edited by the admin site. One use case for overriding this method is to show objects owned by the logged-in user:

所以我们的代码会像

class QueryAdmin(admin.ModelAdmin):
    def queryset(self, request, queryset):

        return SomeModel.objects.raw('SOME QUERY')

如果您想 return 自定义原始查询集,您可以在此处通过对对象管理器使用原始方法调用来实现。它比使用 cursor.execute 更可取。您实际上需要这样的方法 (objects.raw) 而不是 execute 来使用一些 django 管理功能。

即使是现在你也会发现分页不起作用。然后你需要这样做:django pagination and RawQuerySet

我刚刚实现了这个功能。您需要新视图 RawChangeList 扩展 ChangeList 来自 Django 的视图:

from django.contrib.admin.views.main import ChangeList
from django.db import connections

class RawChangeList(ChangeList):
    """
    Extended Django ChangeList to be able show data from RawQueryset.
    """
    def get_count(self):
        connection = connections[self.queryset.db]
        with connection.cursor() as c:
            if connection.vendor == 'microsoft':  # CTE in subquery is not working in SQL Server
                c.execute(self.queryset.raw_query)
                c.execute('SELECT @@ROWCOUNT')
            else:
                query = 'SELECT COUNT(*) FROM ({query}) AS sq'
                c.execute(query.format(query=self.queryset.raw_query))

            return c.fetchone()[0]

    def get_queryset_slice(self):
        connection = connections[self.queryset.db]
        if connection.vendor == 'microsoft':
            # SQL Server needs ordered query for slicing
            if hasattr(self.queryset, 'ordered') and self.queryset.ordered:
                query = '{query}'
            else:
                query = '{query} ORDER BY 1'
            query += ' OFFSET {offset} ROWS FETCH NEXT {limit} ROWS ONLY'
        else:
            query = '{query} LIMIT {limit} OFFSET {offset}'

        return self.queryset.model.objects.raw(
            query.format(
                query=self.queryset.raw_query,
                offset=self.page_num * self.list_per_page,
                limit=(self.page_num + 1) * self.list_per_page - self.page_num * self.list_per_page,
            )
        )

    def get_queryset(self, request):
        """
        Overriding to avoid applying filters in ChangeList because RawQueryset has not filter method.
        So any filters has to be applied manually for now.
        """
        qs = self.root_queryset
        if not hasattr(qs, 'count'):
            qs.count = lambda: self.get_count()
        return qs

    def get_results(self, request):
        if self.show_all:
            qs = self.queryset
        else:
            qs = self.get_queryset_slice()

        paginator = self.model_admin.get_paginator(request, self.queryset, self.list_per_page)

        self.result_count = paginator.count
        self.show_full_result_count = False
        self.show_admin_actions = True
        self.full_result_count = 0
        self.result_list = list(qs)
        self.can_show_all = True
        self.multi_page = True
        self.paginator = paginator

然后在管理员中使用这个RawChangeList

@admin.register(MyModel)
class MyModelAdmin(admin.ModelAdmin):

    def get_changelist(self, request, **kwargs):
        return RawChangeList

    def get_queryset(self, request):
        return MyModel.objects.raw('SELECT * FROM my_app_my_model')

    def get_object(self, request, object_id, from_field=None):
        return MyModel.objects.raw('SELECT * FROM my_app_my_model WHERE id=%s', (object_id, ))[0]