Django 从原始查询创建管理列表
Django create admin list from raw query
我愿意在管理员中创建查询集列表模板
查询集是通过 cursor.execute() 获得的,因此可能需要一个假模型。
基本上我只想利用 Django admin list 提供的分页和过滤功能。
这是我的代码的相关部分
models.py
class Query(object):
def __init__(self, sql):
self.sql = sql
def execute_query(self):
cursor = connection.cursor()
start_time = time()
try:
cursor.execute(self.sql)
except DatabaseError as e:
cursor.close()
raise e
return cursor, ((time() - start_time) * 1000)
admin.py
class QueryAdmin(admin.ModelAdmin):
....
admin.site.register(Query, QueryAdmin)
如果您只是尝试使用 django 管理功能来利用它的分页和过滤功能,您会发现自己花费的时间比使用 django pagination 时要多得多。首先,您会发现分页和过滤 将无法使用 和 cursor.execute
如果您确实需要某种管理功能,请继续阅读。
在 django admin 中使用自定义查询集的首选方法是覆盖 admin class.
中的 get_queryset 方法
The get_queryset method on a ModelAdmin returns a QuerySet of all
model instances that can be edited by the admin site. One use case for
overriding this method is to show objects owned by the logged-in user:
所以我们的代码会像
class QueryAdmin(admin.ModelAdmin):
def queryset(self, request, queryset):
return SomeModel.objects.raw('SOME QUERY')
如果您想 return 自定义原始查询集,您可以在此处通过对对象管理器使用原始方法调用来实现。它比使用 cursor.execute
更可取。您实际上需要这样的方法 (objects.raw) 而不是 execute 来使用一些 django 管理功能。
即使是现在你也会发现分页不起作用。然后你需要这样做:django pagination and RawQuerySet
我刚刚实现了这个功能。您需要新视图 RawChangeList
扩展 ChangeList
来自 Django 的视图:
from django.contrib.admin.views.main import ChangeList
from django.db import connections
class RawChangeList(ChangeList):
"""
Extended Django ChangeList to be able show data from RawQueryset.
"""
def get_count(self):
connection = connections[self.queryset.db]
with connection.cursor() as c:
if connection.vendor == 'microsoft': # CTE in subquery is not working in SQL Server
c.execute(self.queryset.raw_query)
c.execute('SELECT @@ROWCOUNT')
else:
query = 'SELECT COUNT(*) FROM ({query}) AS sq'
c.execute(query.format(query=self.queryset.raw_query))
return c.fetchone()[0]
def get_queryset_slice(self):
connection = connections[self.queryset.db]
if connection.vendor == 'microsoft':
# SQL Server needs ordered query for slicing
if hasattr(self.queryset, 'ordered') and self.queryset.ordered:
query = '{query}'
else:
query = '{query} ORDER BY 1'
query += ' OFFSET {offset} ROWS FETCH NEXT {limit} ROWS ONLY'
else:
query = '{query} LIMIT {limit} OFFSET {offset}'
return self.queryset.model.objects.raw(
query.format(
query=self.queryset.raw_query,
offset=self.page_num * self.list_per_page,
limit=(self.page_num + 1) * self.list_per_page - self.page_num * self.list_per_page,
)
)
def get_queryset(self, request):
"""
Overriding to avoid applying filters in ChangeList because RawQueryset has not filter method.
So any filters has to be applied manually for now.
"""
qs = self.root_queryset
if not hasattr(qs, 'count'):
qs.count = lambda: self.get_count()
return qs
def get_results(self, request):
if self.show_all:
qs = self.queryset
else:
qs = self.get_queryset_slice()
paginator = self.model_admin.get_paginator(request, self.queryset, self.list_per_page)
self.result_count = paginator.count
self.show_full_result_count = False
self.show_admin_actions = True
self.full_result_count = 0
self.result_list = list(qs)
self.can_show_all = True
self.multi_page = True
self.paginator = paginator
然后在管理员中使用这个RawChangeList
:
@admin.register(MyModel)
class MyModelAdmin(admin.ModelAdmin):
def get_changelist(self, request, **kwargs):
return RawChangeList
def get_queryset(self, request):
return MyModel.objects.raw('SELECT * FROM my_app_my_model')
def get_object(self, request, object_id, from_field=None):
return MyModel.objects.raw('SELECT * FROM my_app_my_model WHERE id=%s', (object_id, ))[0]
我愿意在管理员中创建查询集列表模板
查询集是通过 cursor.execute() 获得的,因此可能需要一个假模型。
基本上我只想利用 Django admin list 提供的分页和过滤功能。
这是我的代码的相关部分
models.py
class Query(object):
def __init__(self, sql):
self.sql = sql
def execute_query(self):
cursor = connection.cursor()
start_time = time()
try:
cursor.execute(self.sql)
except DatabaseError as e:
cursor.close()
raise e
return cursor, ((time() - start_time) * 1000)
admin.py
class QueryAdmin(admin.ModelAdmin):
....
admin.site.register(Query, QueryAdmin)
如果您只是尝试使用 django 管理功能来利用它的分页和过滤功能,您会发现自己花费的时间比使用 django pagination 时要多得多。首先,您会发现分页和过滤 将无法使用 和 cursor.execute
如果您确实需要某种管理功能,请继续阅读。
在 django admin 中使用自定义查询集的首选方法是覆盖 admin class.
中的 get_queryset 方法The get_queryset method on a ModelAdmin returns a QuerySet of all model instances that can be edited by the admin site. One use case for overriding this method is to show objects owned by the logged-in user:
所以我们的代码会像
class QueryAdmin(admin.ModelAdmin):
def queryset(self, request, queryset):
return SomeModel.objects.raw('SOME QUERY')
如果您想 return 自定义原始查询集,您可以在此处通过对对象管理器使用原始方法调用来实现。它比使用 cursor.execute
更可取。您实际上需要这样的方法 (objects.raw) 而不是 execute 来使用一些 django 管理功能。
即使是现在你也会发现分页不起作用。然后你需要这样做:django pagination and RawQuerySet
我刚刚实现了这个功能。您需要新视图 RawChangeList
扩展 ChangeList
来自 Django 的视图:
from django.contrib.admin.views.main import ChangeList
from django.db import connections
class RawChangeList(ChangeList):
"""
Extended Django ChangeList to be able show data from RawQueryset.
"""
def get_count(self):
connection = connections[self.queryset.db]
with connection.cursor() as c:
if connection.vendor == 'microsoft': # CTE in subquery is not working in SQL Server
c.execute(self.queryset.raw_query)
c.execute('SELECT @@ROWCOUNT')
else:
query = 'SELECT COUNT(*) FROM ({query}) AS sq'
c.execute(query.format(query=self.queryset.raw_query))
return c.fetchone()[0]
def get_queryset_slice(self):
connection = connections[self.queryset.db]
if connection.vendor == 'microsoft':
# SQL Server needs ordered query for slicing
if hasattr(self.queryset, 'ordered') and self.queryset.ordered:
query = '{query}'
else:
query = '{query} ORDER BY 1'
query += ' OFFSET {offset} ROWS FETCH NEXT {limit} ROWS ONLY'
else:
query = '{query} LIMIT {limit} OFFSET {offset}'
return self.queryset.model.objects.raw(
query.format(
query=self.queryset.raw_query,
offset=self.page_num * self.list_per_page,
limit=(self.page_num + 1) * self.list_per_page - self.page_num * self.list_per_page,
)
)
def get_queryset(self, request):
"""
Overriding to avoid applying filters in ChangeList because RawQueryset has not filter method.
So any filters has to be applied manually for now.
"""
qs = self.root_queryset
if not hasattr(qs, 'count'):
qs.count = lambda: self.get_count()
return qs
def get_results(self, request):
if self.show_all:
qs = self.queryset
else:
qs = self.get_queryset_slice()
paginator = self.model_admin.get_paginator(request, self.queryset, self.list_per_page)
self.result_count = paginator.count
self.show_full_result_count = False
self.show_admin_actions = True
self.full_result_count = 0
self.result_list = list(qs)
self.can_show_all = True
self.multi_page = True
self.paginator = paginator
然后在管理员中使用这个RawChangeList
:
@admin.register(MyModel)
class MyModelAdmin(admin.ModelAdmin):
def get_changelist(self, request, **kwargs):
return RawChangeList
def get_queryset(self, request):
return MyModel.objects.raw('SELECT * FROM my_app_my_model')
def get_object(self, request, object_id, from_field=None):
return MyModel.objects.raw('SELECT * FROM my_app_my_model WHERE id=%s', (object_id, ))[0]