线程模块给出通道关闭错误
Threads module giving error of channel closed
我正在使用 npm xlsx (lib/parserScripts/readExcel.js)
和线程模块来读取一个大的 excel 文件。
这第一次工作正常,但如果我同时上传另一个大文件,则会出现错误
Error: channel closed
at ChildProcess.target.send (internal/child_process.js:554:16)
at Worker.send (/app/node_modules/threads/lib/worker.node/worker.js:108:16)...
这可能是因为之前的线程仍在处理/尚未被杀死,因此当为另一个请求创建新池时,之前的线程仍在忙于处理。
如何解决这个问题?我是否必须手动终止以下代码中的线程?如果是那么怎么办?
index.js
parseFile: ['fileHeaders', (results, cb) => {
const excelParserScript = __dirname + '/../lib/parserScripts/readExcel';
const worksheetIndex = 3;
const params = {
file.path,
worksheetIndex
}
// using worker process
// result will be of the type {error: false, message: '', data: {}}
lib.miniWorker.bufferedJob(excelParserScript, params, (err, result) => {
lib/miniworker.js
const Threads = require('threads');
const Pool = Threads.Pool;
const workerPool = new Pool();
module.exports = class JobManager {
static bufferedJob(pathToScript, params, callback){
workerPool
.run(pathToScript)
.send(params)
.on('done', (result, input) => {
console.log(`Worker Job done: ${pathToScript} `);
callback(null, result);
})
.on('error', (job, error) => {
console.log(`Error in executing Worker Job: ${pathToScript}`);
callback(job || error);
})
}
}
lib/parserScripts/readExcel.js
module.exports = function(input, done) {
const XLSX = require('xlsx');
let workbook;
const path = input.path;
const worksheetIndex = input.worksheetIndex;
const expectedHeaders = input.expectedHeaders || [];
const options = {};
if (expectedHeaders.length > 0) {
options.header = expectedHeaders;
}
const response = {
error: false,
message: '',
data: {}
}
try {
workbook = XLSX.readFile(path, {});
const sheet = workbook['Sheets'][workbook.SheetNames[worksheetIndex]];
const headers = getHeaders(sheet);
const fileData = XLSX.utils.sheet_to_json(workbook['Sheets'][workbook.SheetNames[worksheetIndex]], options);
response.data = fileData;
response.headers = headers;
return done(response)
} catch (err) {
response.error = true;
response.messsage = 'Error in reading the file';
return done(response);
}
function getHeaders(sheet) {
var header = 0, offset = 1;
var hdr = [];
var o = {};
if (sheet == null || sheet["!ref"] == null) return [];
var range = o.range !== undefined ? o.range : sheet["!ref"];
var r;
if (o.header === 1) header = 1;
else if (o.header === "A") header = 2;
else if (Array.isArray(o.header)) header = 3;
switch (typeof range) {
case 'string':
r = safe_decode_range(range);
break;
case 'number':
r = safe_decode_range(sheet["!ref"]);
r.s.r = range;
break;
default:
r = range;
}
if (header > 0) offset = 0;
var rr = XLSX.utils.encode_row(r.s.r);
var cols = new Array(r.e.c - r.s.c + 1);
for (var C = r.s.c; C <= r.e.c; ++C) {
cols[C] = XLSX.utils.encode_col(C);
var val = sheet[cols[C] + rr];
switch (header) {
case 1:
hdr.push(C);
break;
case 2:
hdr.push(cols[C]);
break;
case 3:
hdr.push(o.header[C - r.s.c]);
break;
default:
if (val === undefined) continue;
hdr.push(XLSX.utils.format_cell(val));
}
}
return hdr;
}
function safe_decode_range(range) {
var o = {s: {c: 0, r: 0}, e: {c: 0, r: 0}};
var idx = 0, i = 0, cc = 0;
var len = range.length;
for (idx = 0; i < len; ++i) {
if ((cc = range.charCodeAt(i) - 64) < 1 || cc > 26) break;
idx = 26 * idx + cc;
}
o.s.c = --idx;
for (idx = 0; i < len; ++i) {
if ((cc = range.charCodeAt(i) - 48) < 0 || cc > 9) break;
idx = 10 * idx + cc;
}
o.s.r = --idx;
if (i === len || range.charCodeAt(++i) === 58) {
o.e.c = o.s.c;
o.e.r = o.s.r;
return o;
}
for (idx = 0; i != len; ++i) {
if ((cc = range.charCodeAt(i) - 64) < 1 || cc > 26) break;
idx = 26 * idx + cc;
}
o.e.c = --idx;
for (idx = 0; i != len; ++i) {
if ((cc = range.charCodeAt(i) - 48) < 0 || cc > 9) break;
idx = 10 * idx + cc;
}
o.e.r = --idx;
return o;
}
}
这第一次工作正常,但如果我同时 上传 另一个大文件,则会出现错误。
您应该上传一个不同的文件名,例如 final01.xlsx,然后将其重命名为 final.xlsx
原因是当您上传文件时,读取文件无法完成,因为写入文件会锁定文件并更改内容。
如果上传文件意味着您正在 node.js 代码中同时读取另一个大文件,请忽略我的评论。
问题是因为旧版本的线程模块。更新到新版本,使用非基于事件的更新API可以解决目的。
https://github.com/andywer/threads.js/issues/164
但是,如果您想更正基于事件的代码(来自旧版本),这就是您需要做的(在事件完成后终止线程)。
const Threads = require('threads');
const Pool = Threads.Pool;
module.exports = class JobManager {
static bufferedJob(pathToScript, params, callback){
let workerPool = new Pool();
workerPool
.run(pathToScript)
.send(params)
.on('done', (result, input) => {
console.log(`Worker Job done: ${pathToScript} `);
callback(null, result);
workerPool.killAll();
workerPool = null ;
})
.on('error', (job, error) => {
console.log(`Error in executing Worker Job: ${pathToScript}`);
callback(job || error);
workerPool.killAll();
workerPool = null ;
}).on('abort', (job, error)=>{
console.log(`Abort Worker Job: ${pathToScript}, Error : ${error}`);
callback(job || error);
workerPool.killAll();
workerPool = null ;
}).on('finished', ()=>{
console.log('Everything done, shutting down the thread pool.');
workerPool.killAll();
});
}
}
我正在使用 npm xlsx (lib/parserScripts/readExcel.js) 和线程模块来读取一个大的 excel 文件。
这第一次工作正常,但如果我同时上传另一个大文件,则会出现错误
Error: channel closed
at ChildProcess.target.send (internal/child_process.js:554:16)
at Worker.send (/app/node_modules/threads/lib/worker.node/worker.js:108:16)...
这可能是因为之前的线程仍在处理/尚未被杀死,因此当为另一个请求创建新池时,之前的线程仍在忙于处理。
如何解决这个问题?我是否必须手动终止以下代码中的线程?如果是那么怎么办?
index.js
parseFile: ['fileHeaders', (results, cb) => {
const excelParserScript = __dirname + '/../lib/parserScripts/readExcel';
const worksheetIndex = 3;
const params = {
file.path,
worksheetIndex
}
// using worker process
// result will be of the type {error: false, message: '', data: {}}
lib.miniWorker.bufferedJob(excelParserScript, params, (err, result) => {
lib/miniworker.js
const Threads = require('threads');
const Pool = Threads.Pool;
const workerPool = new Pool();
module.exports = class JobManager {
static bufferedJob(pathToScript, params, callback){
workerPool
.run(pathToScript)
.send(params)
.on('done', (result, input) => {
console.log(`Worker Job done: ${pathToScript} `);
callback(null, result);
})
.on('error', (job, error) => {
console.log(`Error in executing Worker Job: ${pathToScript}`);
callback(job || error);
})
}
}
lib/parserScripts/readExcel.js
module.exports = function(input, done) {
const XLSX = require('xlsx');
let workbook;
const path = input.path;
const worksheetIndex = input.worksheetIndex;
const expectedHeaders = input.expectedHeaders || [];
const options = {};
if (expectedHeaders.length > 0) {
options.header = expectedHeaders;
}
const response = {
error: false,
message: '',
data: {}
}
try {
workbook = XLSX.readFile(path, {});
const sheet = workbook['Sheets'][workbook.SheetNames[worksheetIndex]];
const headers = getHeaders(sheet);
const fileData = XLSX.utils.sheet_to_json(workbook['Sheets'][workbook.SheetNames[worksheetIndex]], options);
response.data = fileData;
response.headers = headers;
return done(response)
} catch (err) {
response.error = true;
response.messsage = 'Error in reading the file';
return done(response);
}
function getHeaders(sheet) {
var header = 0, offset = 1;
var hdr = [];
var o = {};
if (sheet == null || sheet["!ref"] == null) return [];
var range = o.range !== undefined ? o.range : sheet["!ref"];
var r;
if (o.header === 1) header = 1;
else if (o.header === "A") header = 2;
else if (Array.isArray(o.header)) header = 3;
switch (typeof range) {
case 'string':
r = safe_decode_range(range);
break;
case 'number':
r = safe_decode_range(sheet["!ref"]);
r.s.r = range;
break;
default:
r = range;
}
if (header > 0) offset = 0;
var rr = XLSX.utils.encode_row(r.s.r);
var cols = new Array(r.e.c - r.s.c + 1);
for (var C = r.s.c; C <= r.e.c; ++C) {
cols[C] = XLSX.utils.encode_col(C);
var val = sheet[cols[C] + rr];
switch (header) {
case 1:
hdr.push(C);
break;
case 2:
hdr.push(cols[C]);
break;
case 3:
hdr.push(o.header[C - r.s.c]);
break;
default:
if (val === undefined) continue;
hdr.push(XLSX.utils.format_cell(val));
}
}
return hdr;
}
function safe_decode_range(range) {
var o = {s: {c: 0, r: 0}, e: {c: 0, r: 0}};
var idx = 0, i = 0, cc = 0;
var len = range.length;
for (idx = 0; i < len; ++i) {
if ((cc = range.charCodeAt(i) - 64) < 1 || cc > 26) break;
idx = 26 * idx + cc;
}
o.s.c = --idx;
for (idx = 0; i < len; ++i) {
if ((cc = range.charCodeAt(i) - 48) < 0 || cc > 9) break;
idx = 10 * idx + cc;
}
o.s.r = --idx;
if (i === len || range.charCodeAt(++i) === 58) {
o.e.c = o.s.c;
o.e.r = o.s.r;
return o;
}
for (idx = 0; i != len; ++i) {
if ((cc = range.charCodeAt(i) - 64) < 1 || cc > 26) break;
idx = 26 * idx + cc;
}
o.e.c = --idx;
for (idx = 0; i != len; ++i) {
if ((cc = range.charCodeAt(i) - 48) < 0 || cc > 9) break;
idx = 10 * idx + cc;
}
o.e.r = --idx;
return o;
}
}
这第一次工作正常,但如果我同时 上传 另一个大文件,则会出现错误。
您应该上传一个不同的文件名,例如 final01.xlsx,然后将其重命名为 final.xlsx 原因是当您上传文件时,读取文件无法完成,因为写入文件会锁定文件并更改内容。 如果上传文件意味着您正在 node.js 代码中同时读取另一个大文件,请忽略我的评论。
问题是因为旧版本的线程模块。更新到新版本,使用非基于事件的更新API可以解决目的。
https://github.com/andywer/threads.js/issues/164
但是,如果您想更正基于事件的代码(来自旧版本),这就是您需要做的(在事件完成后终止线程)。
const Threads = require('threads');
const Pool = Threads.Pool;
module.exports = class JobManager {
static bufferedJob(pathToScript, params, callback){
let workerPool = new Pool();
workerPool
.run(pathToScript)
.send(params)
.on('done', (result, input) => {
console.log(`Worker Job done: ${pathToScript} `);
callback(null, result);
workerPool.killAll();
workerPool = null ;
})
.on('error', (job, error) => {
console.log(`Error in executing Worker Job: ${pathToScript}`);
callback(job || error);
workerPool.killAll();
workerPool = null ;
}).on('abort', (job, error)=>{
console.log(`Abort Worker Job: ${pathToScript}, Error : ${error}`);
callback(job || error);
workerPool.killAll();
workerPool = null ;
}).on('finished', ()=>{
console.log('Everything done, shutting down the thread pool.');
workerPool.killAll();
});
}
}