pg-promise:运行 查询流中每一行的相关查询内存不足

pg-promise: running a dependent query for each row in a query stream runs out of memory

在我的应用程序中,我需要 运行 对 returns ~60k 行的查询中的每一行进行依赖更新。结果集太大,无法放入内存,因此自然的解决方案是对结果进行流式处理,然后 运行 依次对每个结果进行依赖查询。

无论我尝试什么,我的解决方案都是运行内存不足,尽管我希望流媒体可以让我保持低内存使用率。

经过大量阅读和重读 SO、pg-promise Wiki 的各个页面,并尝试使用不同的方法来实现它,我得出以下结论(简化我的代码):

try {
    const startTime = new Date();
    await db.tx("test-tx", async tx => {
        const qs = new QueryStream(`SELECT s.a AS i FROM GENERATE_SERIES(1, 100000) AS s(a)`);
        const result = await tx.stream(qs, stream => {
            return pgp.spex.stream.read(
                stream,
                async (i, row) => {
                    // console.log(`handling ${i}: ${JSON.stringify(data)}`);
                    await innerQuery(tx, row.i, startTime);
                },
                { readChunks: true }
            )
            .then(r => console.log("read done", r));
        });
        console.log("stream done", result);
    });
    console.log(`transaction done: ${memUsage()}MB, ${duration(startTime)} seconds`);
} catch (error) {
    console.error(error);
} finally {
    db.client.$pool.end();
}

async function innerQuery(tx, count, startTime) {
    if (count % 10000 === 0) {
        console.log(`row ${count}: ${memUsage()}MB, ${duration(startTime)} seconds`);
    }
    await tx.one("SELECT 1");
    if (count % 10000 === 0) {
        console.log(`inner query ${count} done`);
    }
}

function duration(startTime) {
    return Math.round((new Date() - startTime) / 1000);
}

function memUsage() {
    return Math.round(process.memoryUsage().heapUsed / 1024 / 1024);
}

这 运行 是预期的查询,但内存使用量一直在上升:

row 10000: 66MB, 1 seconds
row 20000: 124MB, 2 seconds
row 30000: 181MB, 3 seconds
row 40000: 241MB, 4 seconds
row 50000: 298MB, 5 seconds
row 60000: 355MB, 6 seconds
row 70000: 415MB, 7 seconds
row 80000: 474MB, 8 seconds
row 90000: 532MB, 9 seconds
row 100000: 593MB, 10 seconds
read done { calls: 100000, reads: 100000, length: 100000, duration: 10054 }
stream done { processed: 100000, duration: 10054 }
inner query 10000 done
inner query 20000 done
inner query 30000 done
inner query 40000 done
inner query 50000 done
inner query 60000 done
inner query 70000 done
inner query 80000 done
inner query 90000 done
inner query 100000 done
transaction done: 641MB, 42 seconds

现在,这里有些东西:注意 tx.stream 在所有内部查询解析之前调用 returns,打印到控制台。这解释了内存问题,所有这些闭包和承诺(其中 100k)都以某种方式在内存中等待流完成,以便它们自己可以解析并被 GC。

另一个数据点:如果我在顶层从db.tx更改为db.task,只有一个或两个内部查询运行在连接关闭之前进一步查询导致错误 (Querying against a released or lost connection.).

我也尝试过使用 tx.batch 并使用 readChunks: false 进行 stream.read 调用,但是在单个批处理后就停止并锁定。

那我做错了什么?如何让内部查询在完成后尽快解决,以便 GC 逐步回收内存?

据我所知,没有明显的方法可以降低查询流的速度以等待某些相关查询完成。新的内部查询的创建速度与结果的流式传输速度一样快,因此内存消耗急剧增加。

我找到了一个不使用 QueryStream 的解决方案。这使用服务器端游标,意味着所有查询 运行 串联。还没有探索尝试 运行 这些块并行以增加吞吐量,但它确实解决了内存问题。

const startTime = new Date();
await db.tx("test-tx", async tx => {
    await tx.none(`
        DECLARE test_cursor CURSOR FOR
        SELECT s.a AS i FROM GENERATE_SERIES(1, 100000) AS s(a)`);
    let row;
    while ((row = await tx.oneOrNone("FETCH NEXT FROM test_cursor"))) {
        await innerQuery(tx, row.i, startTime);
    }
    await tx.none("CLOSE test_cursor");
    console.log("outer query done");
});
console.log(`transaction done: ${memUsage()}MB, ${duration(startTime)} seconds`);

这输出类似

row 10000: 9MB, 5 seconds
inner query 10000 done
row 20000: 8MB, 9 seconds
inner query 20000 done
row 30000: 10MB, 14 seconds
inner query 30000 done
row 40000: 9MB, 19 seconds
inner query 40000 done
row 50000: 11MB, 23 seconds
inner query 50000 done
row 60000: 10MB, 28 seconds
inner query 60000 done
row 70000: 8MB, 33 seconds
inner query 70000 done
row 80000: 11MB, 38 seconds
inner query 80000 done
row 90000: 9MB, 43 seconds
inner query 90000 done
row 100000: 12MB, 48 seconds
inner query 100000 done
outer query done
transaction done: 12MB, 48 seconds