如何从 django/sqlalchemy 处理的数据库中批量获取模型对象

How to bulk fetch model objects from database handled by django/sqlalchemy

最近我遇到了以下问题:如何迭代真正的大数据查询以执行操作(比如为每个对象创建两个不同的对象)。 如果你处理一个小的查询集,这很简单:

for obj in Mymodel.objects.all():
    create_corresponding_entries(obj)

现在尝试在具有 900k 个对象的查询集中执行此操作。可能您的电脑会死机,因为它会耗尽所有内存。那么我怎样才能懒惰地实现这一目标呢?无论你使用 Django ORM 还是 SQLAlchemy

都会出现同样的问题

尽管 Django ORM 提供了一个 "lazy" 查询集,但我一直在寻找的是一个 生成器 ,它可以为我提供一种延迟获取对象的方法。 django 中的查询集并不是真正的惰性,它们是惰性的,直到您尝试访问它们,数据库将命中并获取 1M 条目。 SQL炼金术也是如此。如果您有 oracle 或 postgre 数据库,那么您很幸运,您可以使用受支持的服务器端游标。 SQL如果您使用 mysqldb 或 pymysql 方言,SQLAlchemy 也支持这些加上 mysql。我不确定服务器端游标在幕后是如何工作的。

有关

的更多信息

因此,如果您不适合上述任何一种情况,则必须想出一种方法来延迟获取这些对象。因为 Django ORM 和 SQLAlchemy 都支持通过将其转换为纯 SQL 查询来进行切片,所以我想我可以使用自定义生成器来对我需要的查询进行切片。

免责声明: 该解决方案试图解决在本地转储大量数据的问题,它不会尝试最大化查询性能或与数据库相关的任何性能。

警告:与简单的 Mymodel.objects.all() 相比,这将导致对数据库的查询更多,但对您的 RAM 的挑战较小。

def lazy_bulk_fetch(max_obj, max_count, fetch_func, start=0):
    counter = start
    while counter < max_count:
        yield fetch_func()[counter:counter + max_obj]
        counter += max_obj

然后使用它,例如:

fetcher = lazy_bulk_fetch(50, Mymodel.objects.count(), lambda: Mymodel.objects.order_by('id'))
for batch in fetcher:
    make_actions(batch)

这将为我每次迭代获取 50 个对象的列表,直到达到我想要的最大数量。如果您在 Django 中将 make_actions(batch) 更改为 print(batch.query),您将看到如下内容:

SELECT "services_service"."id" FROM "services_service" LIMIT 50
SELECT "services_service"."id" FROM "services_service" LIMIT 50 OFFSET 50
SELECT "services_service"."id" FROM "services_service" LIMIT 50 OFFSET 100
SELECT "services_service"."id" FROM "services_service" LIMIT 50 OFFSET 150

sliceSQLAlchemy supports 可以使用相同的概念。在这种情况下的解决方案是相同的,但不是 python 切片,而是使用 SQLAlchemy 查询对象

slice 函数

编辑: 据我所见,SQLAlchemy 查询 class 实现了 __getitem__ 函数。 因此,对于 SQLAlchemy,您可以使用我为 Django 建议的完全相同的函数。 如果您想显式使用 slice 函数,您最终会得到类似以下:

def lazy_bulk_fetch(max_obj, max_count, fetch_func, start=0):
    counter = start
    while counter < max_count:
        yield fetch_func().slice(counter, counter + max_obj)
        counter += max_obj

无论如何你会这样称呼它:

from sqlalchemy import func
fetcher = lazy_bulk_fetch(50, session.query(func.count(Mymodel.id)), 
                          lambda: session.query(Mymodel).order_by(Mymodel.id))

这里有两个注意事项:

  1. 您想使用 func.count 以便将其转换为服务器中的 COUNT SQL 语句。如果你使用 len(session.query(Mymodel)) 你会在本地转储所有东西,找到它的长度然后扔掉它
  2. 我使用 lambda 这样实现就像 django 一样。我也可以

    lazy_bulk_fetch(50, session.query(func.count(Mymodel.id)), 
                    session.query(Mymodel).order_by(Mymodel.id)) 
    

    但我必须在我的函数中包含

    yield fetch_func.slice(counter, counter + max_obj)
    

编辑 #2: 我添加了排序,否则你无法确定你不会在第 N 个 运行 中得到相同的结果。订购保证您将获得独特的结果。最好将 id 作为排序键,否则你不能确定你错过了一个结果(因为在第 N 次命中时,可能已经添加了一个新条目并且没有 id 的排序可能会导致你错过它或得到双重条目)

如果将处理卸载到数据库(通过 Django ORM),整个操作可以在 3 个数据库调用中完成:

  1. 调用values_list获取所有主键的列表。每个 64 字节的 900K 键,它应该仍然只占用大约 56 MB 的内存,这不应该让你的系统承受过度的压力。
model_ids = MyModel.objects.values_list('id', flat=True)
  1. 现在,决定您希望一次加载多少条目。如果您使用 values_list 的子集调用 in_bulk,您可以按您的系统舒适的块来处理它。对于所有条目,将 CHUNK_SIZE 设置为 len(model_ids)。 (“3 个数据库调用”注释仅在您使用 CHUNK_SIZE > len(model_ids) 调用 in_bulk 时有效。内存负载将取决于 MyModel 的大小,并且 CPU 负载应该是最小的。)
for counter in range(0, len(model_ids), CHUNK_SIZE):
     chunk = MyModel.objects.in_bulk(model_ids[counter:counter+CHUNK_SIZE])
# Do whatever you wish with this chunk, like create the objects but in place.
  1. 最后一部分是您创建其他对象的地方。这是使用 bulk_create 的理想位置,这将使整个过程更加高效。即使您不使用 in_bulkvalues_list,如果您要创建超过 2-3 个对象,bulk_create 也会给您带来显着的优势。 结合步骤 2 中的代码,您可以执行以下操作:
objs_to_create = []
for counter in range(0, len(model_ids), CHUNK_SIZE):
     chunk = MyModel.objects.in_bulk(model_ids[counter:counter+CHUNK_SIZE])
     # Populate the object(s), either directly or in loop, but using MyModel 
     # constructor, not ORM query. That is, use 
     # m = MyModel(..)
     # instead of 
     # m = MyModel.objects.create(..)
     # Append each of the created MyModel python objects to objs_to_create. Note 
     # that we have not created these objects in the database yet.
     # ...
     # Now create these objects in database using a single call
     MyModel.objects.create_bulk(objs_to_create)
     # Rinse and repeat
     objs_to_create = []

不再有 CPU 挂起,您可以根据自己的喜好微调内存使用量。

我不知道我是否误解了你的问题,或者答案是否在当前版本的 Django 之前,但对于 Django,请参阅:https://docs.djangoproject.com/en/dev/ref/models/querysets/#iterator

for i in Mymodel.objects.iterator(chunk_size=2000):
    print(i)

正如在某些数据库的文档中一样,它是通过 RDBMS 上的游标在其他一些数据库上使用一些技巧实现的。

基于@John Paraskevopoulos 对 Django ORM 的回答,我对其进行了一些调整,使其可能更通用一些:

def bulkFetch(Cls,  batchSize: int = 100, start: int = 0, end: int = None, fetchFunc: Callable = None):
    '''
    Query Django model instances and retrieve the instances lazily in batches.
    Params:
    - Cls: the Django model class
    - batchSize: number of instances to yield each iteration
    - start: start number to yield from the queryset
    - end: end order number to yield from the queryset
    - fetchFunc: a function to retrieve instances. By default set to None: all model instances of the given class will be retrieved.
    '''
    counter = start
    maxCount = Cls.objects.count()

    if end is not None and end < maxCount:
        maxCount = end

    def defaultFetchFunc():
        qs = Cls.objects.order_by('pk')
        if end is None:
            return qs
        else:
            return qs[:end]

    if fetchFunc is None:
        fetchFunc = defaultFetchFunc

    while counter < maxCount:
        yield fetchFunc()[counter:counter+batchSize]
        counter += batchSize