PostgreSQL 的分布式处理 table

Distributed processing of a PostgreSQL table

我有一个 PostgreSQL table 有数百万行需要用相同的算法处理。 我正在使用 Python 和 SQLAlchemy.Core 来完成这项任务。

此算法接受一行或多行作为输入,并且return接受具有一些更新值的相同数量的行。

id1, id2, NULL, NULL, NULL -> id1, id2, value1, value2, value3
id1, id3, NULL, NULL, NULL -> id1, id3, value4, value5, value6
id2, id3, NULL, NULL, NULL -> id2, id3, value7, value8, value9
...
id_n, id_m, NULL, NULL, NULL -> id_n, id_m, value_xxx, value_yyy, value_zzz

我正在使用 PC 集群来执行此任务。此集群运行 dask.distributed 个调度程序和工作程序。

我认为,这个任务可以通过 map 功能有效地实现。我的想法是每个工作人员查询数据库,选择处理一些具有 NULL 值的行,然后用结果更新它们。

我的问题是:如何编写 SQL 查询,以便在工作人员之间分配 table 的片段?

我尝试在 SQL 查询中使用 offsetlimit 为每个工作人员定义行的子集,每个工作人员发出:

SQL:

select * from table where value1 is NULL offset N limit 100;
...
update table where id1 = ... and id2 = ...
       set value1 = value...; 

Python:

from sqlalchemy import create_engine, bindparam, select, func
from distributed import Executor, progress

def process(offset, limit):
    engine = create_engine(...)

    # get next piece of work
    query = select(...).where(...).limit(limit).offset(offset)

    rows = engine.execute([select]).fetchall()
    # process rows

    # submit values to table
    update_stmt = table.update().where(...).where(...).values(...) 
    up_values = ...
    engine.execute(update_stmt, up_values) 

if __name__ == '__main__':
    e = Executor('{address}:{port}'.format(address=config('SERVER_ADDR'),
                                       port=config('SERVER_PORT')))
    n_rows = count_rows_to_process()
    chunk_size = 100
    progress(e.map(process, range(0, n_rows, chunk_size)))

但是,这没有用。

range 函数在计算开始之前有 returned 偏移量列表,并且 map 函数在开始 process 函数之前已经将它们分配给工作人员。

然后一些工作人员成功地完成了他们的工作块,将他们的结果提交给 table,并更新了值。

然后新的迭代开始,新的SELECT ...WHERE value1 is NULL LIMIT 100 OFFSET ...查询被发送到数据库,但是偏移量现在是无效的,因为它是在之前的工作人员更新table之前计算的。 NULL 值的数量现在减少了,工作人员可以从数据库接收空集。

我不能在开始计算之前使用一个 SELECT 查询,因为它会 return 巨大 table 不适合 RAM。

SQLAlchemy 手册还指出,对于分布式处理,应该为每个 python 进程在本地创建引擎实例。因此,我无法一次查询数据库并将 returned 游标发送到 process 函数。

因此,解决方案是正确构建 SQL 查询。

要考虑的一个选项是随机化:

SELECT *
FROM table
WHERE value1 IS NULL
ORDER BY random()
LIMIT 100;

在最坏的情况下,您将有多个工作人员并行计算同一件事。如果它不打扰你,这是最简单的方法之一。

另一种选择是将单独的行专用于特定的工作人员:

UPDATE table 
SET value1 = -9999 
WHERE id IN (
    SELECT id 
    FROM table 
    WHERE value1 IS NULL 
    ORDER BY random() 
    LIMIT 100
) RETURNING * ;

这样您 "mark" 您的特定工作人员的行数 "taken" 为 -9999。所有其他工作人员将跳过这些行,因为 value1 不再为 NULL。这里的风险是,如果工作人员失败,您将没有简单的方法来返回这些行 - 您必须手动将它们更新回 NULL。