mongodb 找不到节点游标,超时为 false
mongodb node cursor not found with timeout false
我有一个 nodejs/express 服务器,我正在尝试合并和排序来自多个 mongodb 集合的排序结果,以创建一个排序的 CSV 文件。我实现此目的的方法要求我保持 mongodb 游标活动(无超时)直到我 read/exhaust 所有数据,或直到发生错误,在这种情况下我必须手动关闭它们。当数据点不多时,它似乎可以工作。但是,例如,当 mongo 查询请求一年的数据时,在将近半小时后的某个时间点,我收到以下 mongo 错误:Cursor not found: cursor id: 59427962835
.
Promise
是 bluebird
承诺。用打字稿写的。
import * as _ from 'lodash';
import * as moment from 'moment-timezone';
function findNative(db, collection, spec={}) {
const {query, fields, sort, limit, skip, hint, timeout=true} = spec;
// internal function that gets a connection from the connection pool
// returns promise with connection
return ensureConnection(db)
.then(connection => {
const cursor = connection.collection(collection).find(
query || {},
{fields, sort, limit, skip, hint, timeout});
// For sorted queries we have to limit batchSize
// see https://jira.mongodb.org/browse/SERVER-14228
if (connection.serverConfig.capabilities().maxWireVersion == 0 && sort && !limit) {
cursor.batchSize(0);
}
return cursor;
});
}
function getMongoStream(col, startdate, enddate) {
return findNative('testDb', col, {
query: { t: { $gte: startdate, $lte: enddate }},
sort: { t: 1 },
fields: { i: 0, _id: 0 },
timeout: false
});
}
async function fetchNextCursorData(cursor) {
const hasMore = await cursor.hasNext();
console.log(hasMore, cursor.cursorState.cursorId.toString());
return hasMore ? cursor.next() : Promise.resolve(null);
}
function findEarliestDate(buffer: any[]): [string, number[]] {
let earliestDateMS;
const indices = _(buffer)
.map(x => x && x.t.getTime())
.forEach(t => {
// make sure timestamp is defined
// buffer also contains null values
if(t && (!earliestDateMS || (earliestDateMS && t < earliestDateMS))) {
earliestDateMS = t;
}
})
.reduce((acc, t, i) => {
if(t === earliestDateMS) {
acc.push(i);
}
return acc;
}, []);
return [moment(earliestDateMS).utc().format('YYYY-MM-DD HH:mm:ss.SSS'), indices];
}
function closeAllCursors(cursors: any[]) {
const openCursors = cursors
.filter(c => !c.isClosed());
openCursors.forEach(c => c.close());
}
async function csvData(req, res) {
const collections: string[] = req.swagger.params.collections.value.split(',').sort(),
sources: string[] = req.swagger.params.sources.value.split(',').sort(),
startdate = new Date(Number(req.swagger.params.startdate.value)),
enddate = new Date(Number(req.swagger.params.enddate.value));
const filename = `${moment.utc().format('YYYY-MM-DD_HH:mm')}.csv`;
res.set({
'Content-Type': 'text/csv',
'Content-Disposition': `attachment; filename="${filename}"`
});
res.write('Date UTC,' + sources.join(',') + '\n');
const colPromises = collections.map(col => getMongoStream(col, startdate, enddate));
let cursorsMap: { [rec: string]: any; };
try {
let buffer = [], dateCSVBuffer: any[] = _.fill(Array(sources.length), '');
// fetch first doc from all cursors
const cursors = await Promise.all(colPromises);
cursorsMap = _.zipObject<any>(collections, cursors);
let docs = await Promise.all(cursors.map(fetchNextCursorData));
// initial request made for all collections
let requestedIdx = _.range(0, collections.length);
while(true) {
docs.forEach((doc, i) => {
buffer[requestedIdx[i]] = doc;
});
// null indicates that cursor won't return more data =>
// all cursors are exhausted
if(buffer.every(d => d === null)) {
break;
}
const [date, indices] = findEarliestDate(buffer);
requestedIdx = indices;
indices.forEach(idx => {
// update csv buffer
const {data} = buffer[idx];
Object.keys(data)
.forEach(ch => {
const sourceIndex = sources.indexOf(ch);
if(sourceIndex > -1) {
dateCSVBuffer[sourceIndex] = data[ch];
}
});
// remove doc from buffer
buffer[idx] = null;
});
// send csv string
dateCSVBuffer.unshift(date);
res.write(dateCSVBuffer.join(',') + '\n');
// empty buffer
dateCSVBuffer = dateCSVBuffer.map(() => '');
// request new entry from cursors
const nextDocPromises = indices
.map(idx => cursorsMap[collections[idx]])
.map(fetchNextCursorData);
docs = await Promise.all(nextDocPromises);
}
// end data stream
res.end();
} catch(err) {
// make sure to close all cursors
// will catch all nested promise errors
closeAllCursors(_.values(cursorsMap));
console.error(err);
res.status(500).json(err);
}
}
Mongodb 使用以下选项创建的连接:
{
auto_reconnect: true,
poolSize: 30,
connectTimeoutMS: 90000
}
问题可能是我将光标引用保留在地图中,因此它们没有更新吗?当我做一个 cursor.hasNext()
光标已经死了?我也尝试检查是否 cursor.isClosed()
但它总是 returns false
.
Mongodb 驱动程序是 "mongodb": "2.2.15"
并且查询是针对 v3.0 数据库测试的。
编辑:我做了一个小的计数测试,看看程序崩溃时已经处理了多少文档。
3 个游标(测试用例仅请求来自 3 个集合的数据)具有以下计数和 ID:
3097531 '59427962835'
31190333 '53750510295'
32007475 '101213786015'
最后处理的 ID 为 '59427962835'
的文档游标是编号 4101。所以还没有接近完成
事实证明,将 timeout
添加到 find
查询不起作用。我不得不像这样使用 noCursorTimeout
标志:
const cursor = connection.collection(collection)
.find(query || {}, {fields, sort, limit, skip, hint})
.addCursorFlag('noCursorTimeout', !timeout);
我有一个 nodejs/express 服务器,我正在尝试合并和排序来自多个 mongodb 集合的排序结果,以创建一个排序的 CSV 文件。我实现此目的的方法要求我保持 mongodb 游标活动(无超时)直到我 read/exhaust 所有数据,或直到发生错误,在这种情况下我必须手动关闭它们。当数据点不多时,它似乎可以工作。但是,例如,当 mongo 查询请求一年的数据时,在将近半小时后的某个时间点,我收到以下 mongo 错误:Cursor not found: cursor id: 59427962835
.
Promise
是 bluebird
承诺。用打字稿写的。
import * as _ from 'lodash';
import * as moment from 'moment-timezone';
function findNative(db, collection, spec={}) {
const {query, fields, sort, limit, skip, hint, timeout=true} = spec;
// internal function that gets a connection from the connection pool
// returns promise with connection
return ensureConnection(db)
.then(connection => {
const cursor = connection.collection(collection).find(
query || {},
{fields, sort, limit, skip, hint, timeout});
// For sorted queries we have to limit batchSize
// see https://jira.mongodb.org/browse/SERVER-14228
if (connection.serverConfig.capabilities().maxWireVersion == 0 && sort && !limit) {
cursor.batchSize(0);
}
return cursor;
});
}
function getMongoStream(col, startdate, enddate) {
return findNative('testDb', col, {
query: { t: { $gte: startdate, $lte: enddate }},
sort: { t: 1 },
fields: { i: 0, _id: 0 },
timeout: false
});
}
async function fetchNextCursorData(cursor) {
const hasMore = await cursor.hasNext();
console.log(hasMore, cursor.cursorState.cursorId.toString());
return hasMore ? cursor.next() : Promise.resolve(null);
}
function findEarliestDate(buffer: any[]): [string, number[]] {
let earliestDateMS;
const indices = _(buffer)
.map(x => x && x.t.getTime())
.forEach(t => {
// make sure timestamp is defined
// buffer also contains null values
if(t && (!earliestDateMS || (earliestDateMS && t < earliestDateMS))) {
earliestDateMS = t;
}
})
.reduce((acc, t, i) => {
if(t === earliestDateMS) {
acc.push(i);
}
return acc;
}, []);
return [moment(earliestDateMS).utc().format('YYYY-MM-DD HH:mm:ss.SSS'), indices];
}
function closeAllCursors(cursors: any[]) {
const openCursors = cursors
.filter(c => !c.isClosed());
openCursors.forEach(c => c.close());
}
async function csvData(req, res) {
const collections: string[] = req.swagger.params.collections.value.split(',').sort(),
sources: string[] = req.swagger.params.sources.value.split(',').sort(),
startdate = new Date(Number(req.swagger.params.startdate.value)),
enddate = new Date(Number(req.swagger.params.enddate.value));
const filename = `${moment.utc().format('YYYY-MM-DD_HH:mm')}.csv`;
res.set({
'Content-Type': 'text/csv',
'Content-Disposition': `attachment; filename="${filename}"`
});
res.write('Date UTC,' + sources.join(',') + '\n');
const colPromises = collections.map(col => getMongoStream(col, startdate, enddate));
let cursorsMap: { [rec: string]: any; };
try {
let buffer = [], dateCSVBuffer: any[] = _.fill(Array(sources.length), '');
// fetch first doc from all cursors
const cursors = await Promise.all(colPromises);
cursorsMap = _.zipObject<any>(collections, cursors);
let docs = await Promise.all(cursors.map(fetchNextCursorData));
// initial request made for all collections
let requestedIdx = _.range(0, collections.length);
while(true) {
docs.forEach((doc, i) => {
buffer[requestedIdx[i]] = doc;
});
// null indicates that cursor won't return more data =>
// all cursors are exhausted
if(buffer.every(d => d === null)) {
break;
}
const [date, indices] = findEarliestDate(buffer);
requestedIdx = indices;
indices.forEach(idx => {
// update csv buffer
const {data} = buffer[idx];
Object.keys(data)
.forEach(ch => {
const sourceIndex = sources.indexOf(ch);
if(sourceIndex > -1) {
dateCSVBuffer[sourceIndex] = data[ch];
}
});
// remove doc from buffer
buffer[idx] = null;
});
// send csv string
dateCSVBuffer.unshift(date);
res.write(dateCSVBuffer.join(',') + '\n');
// empty buffer
dateCSVBuffer = dateCSVBuffer.map(() => '');
// request new entry from cursors
const nextDocPromises = indices
.map(idx => cursorsMap[collections[idx]])
.map(fetchNextCursorData);
docs = await Promise.all(nextDocPromises);
}
// end data stream
res.end();
} catch(err) {
// make sure to close all cursors
// will catch all nested promise errors
closeAllCursors(_.values(cursorsMap));
console.error(err);
res.status(500).json(err);
}
}
Mongodb 使用以下选项创建的连接:
{
auto_reconnect: true,
poolSize: 30,
connectTimeoutMS: 90000
}
问题可能是我将光标引用保留在地图中,因此它们没有更新吗?当我做一个 cursor.hasNext()
光标已经死了?我也尝试检查是否 cursor.isClosed()
但它总是 returns false
.
Mongodb 驱动程序是 "mongodb": "2.2.15"
并且查询是针对 v3.0 数据库测试的。
编辑:我做了一个小的计数测试,看看程序崩溃时已经处理了多少文档。 3 个游标(测试用例仅请求来自 3 个集合的数据)具有以下计数和 ID:
3097531 '59427962835'
31190333 '53750510295'
32007475 '101213786015'
最后处理的 ID 为 '59427962835'
的文档游标是编号 4101。所以还没有接近完成
事实证明,将 timeout
添加到 find
查询不起作用。我不得不像这样使用 noCursorTimeout
标志:
const cursor = connection.collection(collection)
.find(query || {}, {fields, sort, limit, skip, hint})
.addCursorFlag('noCursorTimeout', !timeout);