如何从 django/sqlalchemy 处理的数据库中批量获取模型对象
How to bulk fetch model objects from database handled by django/sqlalchemy
最近我遇到了以下问题:如何迭代真正的大数据查询以执行操作(比如为每个对象创建两个不同的对象)。
如果你处理一个小的查询集,这很简单:
for obj in Mymodel.objects.all():
create_corresponding_entries(obj)
现在尝试在具有 900k 个对象的查询集中执行此操作。可能您的电脑会死机,因为它会耗尽所有内存。那么我怎样才能懒惰地实现这一目标呢?无论你使用 Django ORM 还是 SQLAlchemy
都会出现同样的问题
尽管 Django ORM 提供了一个 "lazy" 查询集,但我一直在寻找的是一个 生成器 ,它可以为我提供一种延迟获取对象的方法。 django 中的查询集并不是真正的惰性,它们是惰性的,直到您尝试访问它们,数据库将命中并获取 1M 条目。 SQL炼金术也是如此。如果您有 oracle 或 postgre 数据库,那么您很幸运,您可以使用受支持的服务器端游标。 SQL如果您使用 mysqldb 或 pymysql 方言,SQLAlchemy 也支持这些加上 mysql。我不确定服务器端游标在幕后是如何工作的。
有关
的更多信息
- Django ORM:https://www.niwi.nz/2012/10/22/server-side-cursors-with-postgresql-and-django/
- SQL炼金术:http://docs.sqlalchemy.org/en/latest/orm/query.html#sqlalchemy.orm.query.Query.yield_per
因此,如果您不适合上述任何一种情况,则必须想出一种方法来延迟获取这些对象。因为 Django ORM 和 SQLAlchemy 都支持通过将其转换为纯 SQL 查询来进行切片,所以我想我可以使用自定义生成器来对我需要的查询进行切片。
免责声明: 该解决方案试图解决在本地转储大量数据的问题,它不会尝试最大化查询性能或与数据库相关的任何性能。
警告:与简单的 Mymodel.objects.all()
相比,这将导致对数据库的查询更多,但对您的 RAM 的挑战较小。
def lazy_bulk_fetch(max_obj, max_count, fetch_func, start=0):
counter = start
while counter < max_count:
yield fetch_func()[counter:counter + max_obj]
counter += max_obj
然后使用它,例如:
fetcher = lazy_bulk_fetch(50, Mymodel.objects.count(), lambda: Mymodel.objects.order_by('id'))
for batch in fetcher:
make_actions(batch)
这将为我每次迭代获取 50 个对象的列表,直到达到我想要的最大数量。如果您在 Django 中将 make_actions(batch)
更改为 print(batch.query)
,您将看到如下内容:
SELECT "services_service"."id" FROM "services_service" LIMIT 50
SELECT "services_service"."id" FROM "services_service" LIMIT 50 OFFSET 50
SELECT "services_service"."id" FROM "services_service" LIMIT 50 OFFSET 100
SELECT "services_service"."id" FROM "services_service" LIMIT 50 OFFSET 150
slice
和 SQLAlchemy supports 可以使用相同的概念。在这种情况下的解决方案是相同的,但不是 python 切片,而是使用 SQLAlchemy 查询对象
的 slice
函数
编辑: 据我所见,SQLAlchemy 查询 class 实现了 __getitem__
函数。 因此,对于 SQLAlchemy,您可以使用我为 Django 建议的完全相同的函数。 如果您想显式使用 slice
函数,您最终会得到类似以下:
def lazy_bulk_fetch(max_obj, max_count, fetch_func, start=0):
counter = start
while counter < max_count:
yield fetch_func().slice(counter, counter + max_obj)
counter += max_obj
无论如何你会这样称呼它:
from sqlalchemy import func
fetcher = lazy_bulk_fetch(50, session.query(func.count(Mymodel.id)),
lambda: session.query(Mymodel).order_by(Mymodel.id))
这里有两个注意事项:
- 您想使用
func.count
以便将其转换为服务器中的 COUNT
SQL 语句。如果你使用 len(session.query(Mymodel))
你会在本地转储所有东西,找到它的长度然后扔掉它
我使用 lambda
这样实现就像 django 一样。我也可以
lazy_bulk_fetch(50, session.query(func.count(Mymodel.id)),
session.query(Mymodel).order_by(Mymodel.id))
但我必须在我的函数中包含
yield fetch_func.slice(counter, counter + max_obj)
编辑 #2: 我添加了排序,否则你无法确定你不会在第 N 个 运行 中得到相同的结果。订购保证您将获得独特的结果。最好将 id 作为排序键,否则你不能确定你错过了一个结果(因为在第 N 次命中时,可能已经添加了一个新条目并且没有 id 的排序可能会导致你错过它或得到双重条目)
如果将处理卸载到数据库(通过 Django ORM),整个操作可以在 3 个数据库调用中完成:
- 调用values_list获取所有主键的列表。每个 64 字节的 900K 键,它应该仍然只占用大约 56 MB 的内存,这不应该让你的系统承受过度的压力。
model_ids = MyModel.objects.values_list('id', flat=True)
- 现在,决定您希望一次加载多少条目。如果您使用
values_list
的子集调用 in_bulk,您可以按您的系统舒适的块来处理它。对于所有条目,将 CHUNK_SIZE
设置为 len(model_ids)
。 (“3 个数据库调用”注释仅在您使用 CHUNK_SIZE > len(model_ids)
调用 in_bulk 时有效。内存负载将取决于 MyModel 的大小,并且 CPU 负载应该是最小的。)
for counter in range(0, len(model_ids), CHUNK_SIZE):
chunk = MyModel.objects.in_bulk(model_ids[counter:counter+CHUNK_SIZE])
# Do whatever you wish with this chunk, like create the objects but in place.
- 最后一部分是您创建其他对象的地方。这是使用 bulk_create 的理想位置,这将使整个过程更加高效。即使您不使用
in_bulk
和 values_list
,如果您要创建超过 2-3 个对象,bulk_create
也会给您带来显着的优势。
结合步骤 2 中的代码,您可以执行以下操作:
objs_to_create = []
for counter in range(0, len(model_ids), CHUNK_SIZE):
chunk = MyModel.objects.in_bulk(model_ids[counter:counter+CHUNK_SIZE])
# Populate the object(s), either directly or in loop, but using MyModel
# constructor, not ORM query. That is, use
# m = MyModel(..)
# instead of
# m = MyModel.objects.create(..)
# Append each of the created MyModel python objects to objs_to_create. Note
# that we have not created these objects in the database yet.
# ...
# Now create these objects in database using a single call
MyModel.objects.create_bulk(objs_to_create)
# Rinse and repeat
objs_to_create = []
不再有 CPU 挂起,您可以根据自己的喜好微调内存使用量。
我不知道我是否误解了你的问题,或者答案是否在当前版本的 Django 之前,但对于 Django,请参阅:https://docs.djangoproject.com/en/dev/ref/models/querysets/#iterator
for i in Mymodel.objects.iterator(chunk_size=2000):
print(i)
正如在某些数据库的文档中一样,它是通过 RDBMS 上的游标在其他一些数据库上使用一些技巧实现的。
基于@John Paraskevopoulos 对 Django ORM 的回答,我对其进行了一些调整,使其可能更通用一些:
def bulkFetch(Cls, batchSize: int = 100, start: int = 0, end: int = None, fetchFunc: Callable = None):
'''
Query Django model instances and retrieve the instances lazily in batches.
Params:
- Cls: the Django model class
- batchSize: number of instances to yield each iteration
- start: start number to yield from the queryset
- end: end order number to yield from the queryset
- fetchFunc: a function to retrieve instances. By default set to None: all model instances of the given class will be retrieved.
'''
counter = start
maxCount = Cls.objects.count()
if end is not None and end < maxCount:
maxCount = end
def defaultFetchFunc():
qs = Cls.objects.order_by('pk')
if end is None:
return qs
else:
return qs[:end]
if fetchFunc is None:
fetchFunc = defaultFetchFunc
while counter < maxCount:
yield fetchFunc()[counter:counter+batchSize]
counter += batchSize
最近我遇到了以下问题:如何迭代真正的大数据查询以执行操作(比如为每个对象创建两个不同的对象)。 如果你处理一个小的查询集,这很简单:
for obj in Mymodel.objects.all():
create_corresponding_entries(obj)
现在尝试在具有 900k 个对象的查询集中执行此操作。可能您的电脑会死机,因为它会耗尽所有内存。那么我怎样才能懒惰地实现这一目标呢?无论你使用 Django ORM 还是 SQLAlchemy
都会出现同样的问题尽管 Django ORM 提供了一个 "lazy" 查询集,但我一直在寻找的是一个 生成器 ,它可以为我提供一种延迟获取对象的方法。 django 中的查询集并不是真正的惰性,它们是惰性的,直到您尝试访问它们,数据库将命中并获取 1M 条目。 SQL炼金术也是如此。如果您有 oracle 或 postgre 数据库,那么您很幸运,您可以使用受支持的服务器端游标。 SQL如果您使用 mysqldb 或 pymysql 方言,SQLAlchemy 也支持这些加上 mysql。我不确定服务器端游标在幕后是如何工作的。
有关
的更多信息- Django ORM:https://www.niwi.nz/2012/10/22/server-side-cursors-with-postgresql-and-django/
- SQL炼金术:http://docs.sqlalchemy.org/en/latest/orm/query.html#sqlalchemy.orm.query.Query.yield_per
因此,如果您不适合上述任何一种情况,则必须想出一种方法来延迟获取这些对象。因为 Django ORM 和 SQLAlchemy 都支持通过将其转换为纯 SQL 查询来进行切片,所以我想我可以使用自定义生成器来对我需要的查询进行切片。
免责声明: 该解决方案试图解决在本地转储大量数据的问题,它不会尝试最大化查询性能或与数据库相关的任何性能。
警告:与简单的 Mymodel.objects.all()
相比,这将导致对数据库的查询更多,但对您的 RAM 的挑战较小。
def lazy_bulk_fetch(max_obj, max_count, fetch_func, start=0):
counter = start
while counter < max_count:
yield fetch_func()[counter:counter + max_obj]
counter += max_obj
然后使用它,例如:
fetcher = lazy_bulk_fetch(50, Mymodel.objects.count(), lambda: Mymodel.objects.order_by('id'))
for batch in fetcher:
make_actions(batch)
这将为我每次迭代获取 50 个对象的列表,直到达到我想要的最大数量。如果您在 Django 中将 make_actions(batch)
更改为 print(batch.query)
,您将看到如下内容:
SELECT "services_service"."id" FROM "services_service" LIMIT 50
SELECT "services_service"."id" FROM "services_service" LIMIT 50 OFFSET 50
SELECT "services_service"."id" FROM "services_service" LIMIT 50 OFFSET 100
SELECT "services_service"."id" FROM "services_service" LIMIT 50 OFFSET 150
slice
和 SQLAlchemy supports 可以使用相同的概念。在这种情况下的解决方案是相同的,但不是 python 切片,而是使用 SQLAlchemy 查询对象
slice
函数
编辑: 据我所见,SQLAlchemy 查询 class 实现了 __getitem__
函数。 因此,对于 SQLAlchemy,您可以使用我为 Django 建议的完全相同的函数。 如果您想显式使用 slice
函数,您最终会得到类似以下:
def lazy_bulk_fetch(max_obj, max_count, fetch_func, start=0):
counter = start
while counter < max_count:
yield fetch_func().slice(counter, counter + max_obj)
counter += max_obj
无论如何你会这样称呼它:
from sqlalchemy import func
fetcher = lazy_bulk_fetch(50, session.query(func.count(Mymodel.id)),
lambda: session.query(Mymodel).order_by(Mymodel.id))
这里有两个注意事项:
- 您想使用
func.count
以便将其转换为服务器中的COUNT
SQL 语句。如果你使用len(session.query(Mymodel))
你会在本地转储所有东西,找到它的长度然后扔掉它 我使用
lambda
这样实现就像 django 一样。我也可以lazy_bulk_fetch(50, session.query(func.count(Mymodel.id)), session.query(Mymodel).order_by(Mymodel.id))
但我必须在我的函数中包含
yield fetch_func.slice(counter, counter + max_obj)
编辑 #2: 我添加了排序,否则你无法确定你不会在第 N 个 运行 中得到相同的结果。订购保证您将获得独特的结果。最好将 id 作为排序键,否则你不能确定你错过了一个结果(因为在第 N 次命中时,可能已经添加了一个新条目并且没有 id 的排序可能会导致你错过它或得到双重条目)
如果将处理卸载到数据库(通过 Django ORM),整个操作可以在 3 个数据库调用中完成:
- 调用values_list获取所有主键的列表。每个 64 字节的 900K 键,它应该仍然只占用大约 56 MB 的内存,这不应该让你的系统承受过度的压力。
model_ids = MyModel.objects.values_list('id', flat=True)
- 现在,决定您希望一次加载多少条目。如果您使用
values_list
的子集调用 in_bulk,您可以按您的系统舒适的块来处理它。对于所有条目,将CHUNK_SIZE
设置为len(model_ids)
。 (“3 个数据库调用”注释仅在您使用CHUNK_SIZE > len(model_ids)
调用 in_bulk 时有效。内存负载将取决于 MyModel 的大小,并且 CPU 负载应该是最小的。)
for counter in range(0, len(model_ids), CHUNK_SIZE):
chunk = MyModel.objects.in_bulk(model_ids[counter:counter+CHUNK_SIZE])
# Do whatever you wish with this chunk, like create the objects but in place.
- 最后一部分是您创建其他对象的地方。这是使用 bulk_create 的理想位置,这将使整个过程更加高效。即使您不使用
in_bulk
和values_list
,如果您要创建超过 2-3 个对象,bulk_create
也会给您带来显着的优势。 结合步骤 2 中的代码,您可以执行以下操作:
objs_to_create = []
for counter in range(0, len(model_ids), CHUNK_SIZE):
chunk = MyModel.objects.in_bulk(model_ids[counter:counter+CHUNK_SIZE])
# Populate the object(s), either directly or in loop, but using MyModel
# constructor, not ORM query. That is, use
# m = MyModel(..)
# instead of
# m = MyModel.objects.create(..)
# Append each of the created MyModel python objects to objs_to_create. Note
# that we have not created these objects in the database yet.
# ...
# Now create these objects in database using a single call
MyModel.objects.create_bulk(objs_to_create)
# Rinse and repeat
objs_to_create = []
不再有 CPU 挂起,您可以根据自己的喜好微调内存使用量。
我不知道我是否误解了你的问题,或者答案是否在当前版本的 Django 之前,但对于 Django,请参阅:https://docs.djangoproject.com/en/dev/ref/models/querysets/#iterator
for i in Mymodel.objects.iterator(chunk_size=2000):
print(i)
正如在某些数据库的文档中一样,它是通过 RDBMS 上的游标在其他一些数据库上使用一些技巧实现的。
基于@John Paraskevopoulos 对 Django ORM 的回答,我对其进行了一些调整,使其可能更通用一些:
def bulkFetch(Cls, batchSize: int = 100, start: int = 0, end: int = None, fetchFunc: Callable = None):
'''
Query Django model instances and retrieve the instances lazily in batches.
Params:
- Cls: the Django model class
- batchSize: number of instances to yield each iteration
- start: start number to yield from the queryset
- end: end order number to yield from the queryset
- fetchFunc: a function to retrieve instances. By default set to None: all model instances of the given class will be retrieved.
'''
counter = start
maxCount = Cls.objects.count()
if end is not None and end < maxCount:
maxCount = end
def defaultFetchFunc():
qs = Cls.objects.order_by('pk')
if end is None:
return qs
else:
return qs[:end]
if fetchFunc is None:
fetchFunc = defaultFetchFunc
while counter < maxCount:
yield fetchFunc()[counter:counter+batchSize]
counter += batchSize