使用 Node.js 和 csv-parse 在 Google Pub/Sub 中发布的消息多于实际消息
Publishing more than actual messages in Google Pub/Sub using Node.js and csv-parse
使用 Node.js、Google Pub/Sub、csv-parse.
用例 -
我有一个大的 csv 文件要在我的数据库中处理和导入。它几乎没有第三方 API,需要 1 秒来处理每一行。所以流程如下 -
- 用户上传文件
- 节点服务器上传存储文件并发送消息到PubSubNo.1
- 现在我的监听器监听上面的 pubsub 并开始处理这些消息,它下载文件并开始分解每一行并发布到另一个 PubSub 以进行进一步处理
- 最后我并行处理这些较小的行消息并实现更快的处理。
问题 -
一旦我的听众下载了文件,它就会发送 x no。到下一个 PubSubNo2 的行消息,但是当我检查它的订阅时,它显示了多于 x 条消息。
例如我上传了一个 6000 条记录的 csv,它在订阅者上显示了超过 40K-50K 条消息。
Package.json
"dependencies": {
"@google-cloud/pubsub": "1.5.0",
"axios": "^0.19.2",
"csv-parse": "^4.8.5",
"dotenv": "^8.2.0",
"google-gax": "1.14.1",
"googleapis": "47.0.0",
"moment": "^2.24.0",
"path": "^0.12.7",
"pg": "^7.18.1",
"winston": "^3.0.0"
}
出版商代码
async processFile(filename) {
let cnt = 0;
let index = null;
let rowCounter = 0;
const handler = (resolve, reject) => {
const parser = CsvParser({
delimiter: ',',
})
.on('readable', () => {
let row;
let hello = 0;
let busy = false;
this.meta.totalRows = (parser.info.records - 1);
while (row = parser.read()) {
if (cnt++ === 0) {
index = row;
continue;
}
let messageObject = {
customFieldsMap: this.customFieldsMap,
importAttributes: this.jc.attrs,
importColumnData: row,
rowCount: cnt,
importColumnList: index,
authToken: this.token
}
let topicPublishResult = PubSubPublish.publishToTopic(process.env.IMPORT_CSV_ROW_PUBLISHING_TOPIC, messageObject);
topicPublishResult.then((response) => {
rowCounter += 1;
const messageInfo = "Row " + rowCounter + " published" +
" | MessageId = " + response +
" | importId = " + this.data.importId +
" | fileId = " + this.data.fileId +
" | orgId = " + this.data.orgId;
console.info(messageInfo);
})
}
})
.on('end', () => {
console.log("File consumed!");
resolve(this.setStatus("queued"))
})
.on('error', reject);
fs.createReadStream(filename).pipe(parser);
};
await new Promise(handler);
}
并发布模块代码
const {
PubSub
} = require('@google-cloud/pubsub');
const pubsub = new PubSub({
projectId: process.env.PROJECT_ID
});
module.exports = {
publishToTopic: function(topicName, data) {
return pubsub.topic(topicName, {
batching: {
maxMessages: 500,
maxMilliseconds: 5000,
}
}).publish(Buffer.from(JSON.stringify(data)));
},
};
这对于文件 os 10、100,200,2000 条记录没有任何问题,但对于更多的 6K 记录会出现问题。
在我发布 6K 记录后,所有 6K 记录都出现 UnhandledPromiseRejection 错误,例如
(node:49994) UnhandledPromiseRejectionWarning: Error: Retry total timeout exceeded before any response was received
at repeat (/Users/tarungupta/office/import-processor/node_modules/google-gax/build/src/normalCalls/retries.js:65:31)
at Timeout._onTimeout (/Users/tarungupta/office/import-processor/node_modules/google-gax/build/src/normalCalls/retries.js:100:25)
at listOnTimeout (internal/timers.js:531:17)
at processTimers (internal/timers.js:475:7)
(node:49994) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 6000)
感谢任何帮助!
当您有 6,000 条消息要发布时,您的发布者可能会不知所措。原因是您为在 publishToTopic
方法中创建的每条消息创建了发布者的新实例。因此,您无法利用任何批处理,而是等待 5 秒来发送每条消息。每条消息的开销很大。这可能意味着回调没有得到及时处理,导致超时并尝试重新发送。您希望一次性创建 pubsub.topic
对象,然后在发布调用中重复使用它。
使用 Node.js、Google Pub/Sub、csv-parse.
用例 - 我有一个大的 csv 文件要在我的数据库中处理和导入。它几乎没有第三方 API,需要 1 秒来处理每一行。所以流程如下 -
- 用户上传文件
- 节点服务器上传存储文件并发送消息到PubSubNo.1
- 现在我的监听器监听上面的 pubsub 并开始处理这些消息,它下载文件并开始分解每一行并发布到另一个 PubSub 以进行进一步处理
- 最后我并行处理这些较小的行消息并实现更快的处理。
问题 - 一旦我的听众下载了文件,它就会发送 x no。到下一个 PubSubNo2 的行消息,但是当我检查它的订阅时,它显示了多于 x 条消息。 例如我上传了一个 6000 条记录的 csv,它在订阅者上显示了超过 40K-50K 条消息。
Package.json
"dependencies": {
"@google-cloud/pubsub": "1.5.0",
"axios": "^0.19.2",
"csv-parse": "^4.8.5",
"dotenv": "^8.2.0",
"google-gax": "1.14.1",
"googleapis": "47.0.0",
"moment": "^2.24.0",
"path": "^0.12.7",
"pg": "^7.18.1",
"winston": "^3.0.0"
}
出版商代码
async processFile(filename) {
let cnt = 0;
let index = null;
let rowCounter = 0;
const handler = (resolve, reject) => {
const parser = CsvParser({
delimiter: ',',
})
.on('readable', () => {
let row;
let hello = 0;
let busy = false;
this.meta.totalRows = (parser.info.records - 1);
while (row = parser.read()) {
if (cnt++ === 0) {
index = row;
continue;
}
let messageObject = {
customFieldsMap: this.customFieldsMap,
importAttributes: this.jc.attrs,
importColumnData: row,
rowCount: cnt,
importColumnList: index,
authToken: this.token
}
let topicPublishResult = PubSubPublish.publishToTopic(process.env.IMPORT_CSV_ROW_PUBLISHING_TOPIC, messageObject);
topicPublishResult.then((response) => {
rowCounter += 1;
const messageInfo = "Row " + rowCounter + " published" +
" | MessageId = " + response +
" | importId = " + this.data.importId +
" | fileId = " + this.data.fileId +
" | orgId = " + this.data.orgId;
console.info(messageInfo);
})
}
})
.on('end', () => {
console.log("File consumed!");
resolve(this.setStatus("queued"))
})
.on('error', reject);
fs.createReadStream(filename).pipe(parser);
};
await new Promise(handler);
}
并发布模块代码
const {
PubSub
} = require('@google-cloud/pubsub');
const pubsub = new PubSub({
projectId: process.env.PROJECT_ID
});
module.exports = {
publishToTopic: function(topicName, data) {
return pubsub.topic(topicName, {
batching: {
maxMessages: 500,
maxMilliseconds: 5000,
}
}).publish(Buffer.from(JSON.stringify(data)));
},
};
这对于文件 os 10、100,200,2000 条记录没有任何问题,但对于更多的 6K 记录会出现问题。 在我发布 6K 记录后,所有 6K 记录都出现 UnhandledPromiseRejection 错误,例如
(node:49994) UnhandledPromiseRejectionWarning: Error: Retry total timeout exceeded before any response was received
at repeat (/Users/tarungupta/office/import-processor/node_modules/google-gax/build/src/normalCalls/retries.js:65:31)
at Timeout._onTimeout (/Users/tarungupta/office/import-processor/node_modules/google-gax/build/src/normalCalls/retries.js:100:25)
at listOnTimeout (internal/timers.js:531:17)
at processTimers (internal/timers.js:475:7)
(node:49994) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 6000)
感谢任何帮助!
当您有 6,000 条消息要发布时,您的发布者可能会不知所措。原因是您为在 publishToTopic
方法中创建的每条消息创建了发布者的新实例。因此,您无法利用任何批处理,而是等待 5 秒来发送每条消息。每条消息的开销很大。这可能意味着回调没有得到及时处理,导致超时并尝试重新发送。您希望一次性创建 pubsub.topic
对象,然后在发布调用中重复使用它。