如何在 psycopg2 中使用服务器端游标

How to use server side cursors with psycopg2

我有一个有 400 万行的 table,我使用 psycopg2 执行一个:

   SELECT * FROM ..WHERE query

我以前从未听说过服务器端游标,我正在阅读它,当您期望获得大量结果时这是一个很好的做法。

我发现文档有些局限,而且我有一些基本问题。

首先我将服务器端游标声明为:

cur = conn.cursor('cursor-name')

然后我将查询执行为:

cur.itersize = 10000
sqlstr = "SELECT clmn1, clmn2 FROM public.table WHERE clmn1 LIKE 'At%'"
cur.execute(sqlstr)

我的问题是:我现在该怎么办?我如何获得结果?

我是否按以下方式遍历行:

row = cur.fetchone()
while row:
   row = cur.fetchone()

或者我使用 fetchmany() 并这样做:

row = cur.fetchmany(10)

但在第二种情况下,我如何"scroll"得到结果?

另外itersize有什么意义?

除了 cur.fetchmany(n),您还可以使用 PostgreSQL cursors:

cur.execute("declare foo cursor for select * from generate_series(1,1000000)")
cur.execute("fetch forward 100 from foo")
rows = cur.fetchall()
# ...
cur.execute("fetch forward 100 from foo")
rows = cur.fetchall()
# and so on

Psycopg2 有一个很好的界面来处理服务器端游标。这是一个可能使用的模板:

with psycopg2.connect(database_connection_string) as conn:
    with conn.cursor(name='name_of_cursor') as cursor:

        cursor.itersize = 20000

        query = "SELECT * FROM ..."
        cursor.execute(query)

        for row in cursor:
            # process row 

上面的代码创建连接并自动将查询结果放入服务器端游标。值 itersize 设置客户端一次从服务器端游标中拉下的行数。您使用的值应该平衡网络调用的数量与客户端上的内存使用量。例如,如果您的结果计数为三百万,则 itersize 值为 2000(默认值)将导致 1500 次网络调用。如果2000行消耗的内存少,就增加这个数字。

当使用 for row in cursor 时,您当然是一次处理一行,但 Psycopg2 会为您一次预取 itersize 行。

如果你出于某种原因想使用 fetchmany,你可以这样做:

while True:
    rows = cursor.fetchmany(100)
    if len(rows) > 0:
        for row in rows:
            # process row
    else:
        break

fetchmany 的这种用法不会触发对服务器的网络调用以获取更多行,直到预取的批次用完为止。 (这是一个复杂的例子,上面的代码没有提供任何内容,但演示了如何在需要时使用 fetchmany。)

当我不想一次加载数百万行时,我倾向于这样做。如果将数百万行加载到内存中,您可以将程序变成内存消耗大户。特别是如果您要从这些行或类似的东西中创建 python 域对象。我不确定名称中的 uuid4 是否必要,但我的想法是,如果两个进程进行相同的查询,我希望单个服务器端游标不重叠。

from uuid import uuid4
import psycopg2

def fetch_things() -> Iterable[MyDomainObject]:
    with psycopg2.connect(database_connection_string) as conn:
        with conn.cursor(name=f"my_name_{uuid4()}") as cursor:
            cursor.itersize = 500_000

            query = "SELECT * FROM ..."
            cursor.execute(query)

            for row in cursor:
                yield MyDomainObject(row)

如果有人知道这是否会在 SQL 服务器或类似问题上造成存储问题,我很感兴趣。