在 NodeJS 工作线程中执行网络 I/O 的问题
Issues performing network I/O in a NodeJS worker thread
我有一个脚本可以从服务器下载数千个文件,对这些文件执行一些 CPU 密集型计算,然后将结果上传到某个地方。作为增加的复杂程度,我想限制与我正在下载文件的服务器的并发连接数。
为了从事件线程中获取 CPU 密集型计算,我利用了 workerpool by josdejong。我还想我可以利用这样一个事实,即在任何给定时间只会启动有限数量的线程来限制与我的服务器的并发连接数,所以我尝试将网络 I/O 放在工作线程中像这样处理(TypeScript):
import Axios from "axios";
import workerpool from "workerpool";
const pool = workerpool.pool({
minWorkers: "max",
});
async function processData(file: string) {
console.log("Downloading " + file);
const csv = await Axios.request<IncomingMessage>({
method: "GET",
url: file,
responseType: "stream"
});
console.log(csv);
// TODO: Will process the file here
}
export default async function (files: string[]) {
const promiseArray: workerpool.Promise<Promise<void>>[] = [];
// Only processing the first file for now during testing
files.slice(0, 1).forEach((file) => {
promiseArray.push(pool.exec(processData, [file]));
});
await Promise.allSettled(promiseArray);
await pool.terminate();
}
当我编译和 运行 这段代码时,我看到消息“正在下载 test.txt”,但之后我没有看到以下日志语句 (console.log(csv)
)
我已尝试对这段代码进行各种修改,包括删除 responseType
、删除 await
以及仅检查 Axios 返回的 Promise
,使函数成为非异步的,等等。不管怎样似乎总是在 Axios.request
行
上崩溃
工作线程是否无法打开 HTTP 连接之类的?还是我犯了一个愚蠢的错误?
如果没有到达这行代码:
console.log(csv);
然后,要么 Axios.request()
从未履行其承诺,要么该承诺被拒绝。您在任何这些函数中都没有错误处理,因此如果它被拒绝,您将不知道也不会记录问题。作为初学者,我建议您检测代码,以便记录任何拒绝:
async function processData(file: string) {
try {
console.log("Downloading " + file);
const csv = await Axios.request<IncomingMessage>({
method: "GET",
url: file,
responseType: "stream"
});
console.log(csv);
} catch(e) {
console.log(e); // log an error
throw e; // propagate rejection/error
}
}
作为代码设计的一般要点,您应该在某种程度上捕获并记录任何可能的承诺拒绝。您不必在最低调用级别捕获它们,因为它们会通过返回的承诺向上传播,但您确实需要在某处捕获任何可能的拒绝,并且为了您自己的开发理智,您需要记录它以便您可以查看它何时发生以及错误是什么。
您不能在工作线程中执行 TypeScript。 pool.exec
方法接受静态 JavaScript 函数或具有相同函数的 JavaScript 文件的路径。
Note that both function and arguments must be static and stringifiable, as they need to be sent to the worker in a serialized form. In case of large functions or function arguments, the overhead of sending the data to the worker can be significant.
我正在尝试使用 TypeScript 进行这项工作。解决此问题的可能方法是:
- 在 TypeScript 中编写一个辅助函数,使用任何捆绑器将其编译为单独的捆绑包,然后将编译文件的路径传递给
pool.exec
。我设法完成了这项工作,但我唯一不满意的是,使用此解决方案你不能使用 nodemon(如果你使用它)
- 使用 JS 包装器编译 TS 源代码并使用 ts-node 执行它。然后将该包装器的路径传递给
pool.exec
函数。此解决方案不适用于捆绑器
我有一个脚本可以从服务器下载数千个文件,对这些文件执行一些 CPU 密集型计算,然后将结果上传到某个地方。作为增加的复杂程度,我想限制与我正在下载文件的服务器的并发连接数。
为了从事件线程中获取 CPU 密集型计算,我利用了 workerpool by josdejong。我还想我可以利用这样一个事实,即在任何给定时间只会启动有限数量的线程来限制与我的服务器的并发连接数,所以我尝试将网络 I/O 放在工作线程中像这样处理(TypeScript):
import Axios from "axios";
import workerpool from "workerpool";
const pool = workerpool.pool({
minWorkers: "max",
});
async function processData(file: string) {
console.log("Downloading " + file);
const csv = await Axios.request<IncomingMessage>({
method: "GET",
url: file,
responseType: "stream"
});
console.log(csv);
// TODO: Will process the file here
}
export default async function (files: string[]) {
const promiseArray: workerpool.Promise<Promise<void>>[] = [];
// Only processing the first file for now during testing
files.slice(0, 1).forEach((file) => {
promiseArray.push(pool.exec(processData, [file]));
});
await Promise.allSettled(promiseArray);
await pool.terminate();
}
当我编译和 运行 这段代码时,我看到消息“正在下载 test.txt”,但之后我没有看到以下日志语句 (console.log(csv)
)
我已尝试对这段代码进行各种修改,包括删除 responseType
、删除 await
以及仅检查 Axios 返回的 Promise
,使函数成为非异步的,等等。不管怎样似乎总是在 Axios.request
行
工作线程是否无法打开 HTTP 连接之类的?还是我犯了一个愚蠢的错误?
如果没有到达这行代码:
console.log(csv);
然后,要么 Axios.request()
从未履行其承诺,要么该承诺被拒绝。您在任何这些函数中都没有错误处理,因此如果它被拒绝,您将不知道也不会记录问题。作为初学者,我建议您检测代码,以便记录任何拒绝:
async function processData(file: string) {
try {
console.log("Downloading " + file);
const csv = await Axios.request<IncomingMessage>({
method: "GET",
url: file,
responseType: "stream"
});
console.log(csv);
} catch(e) {
console.log(e); // log an error
throw e; // propagate rejection/error
}
}
作为代码设计的一般要点,您应该在某种程度上捕获并记录任何可能的承诺拒绝。您不必在最低调用级别捕获它们,因为它们会通过返回的承诺向上传播,但您确实需要在某处捕获任何可能的拒绝,并且为了您自己的开发理智,您需要记录它以便您可以查看它何时发生以及错误是什么。
您不能在工作线程中执行 TypeScript。 pool.exec
方法接受静态 JavaScript 函数或具有相同函数的 JavaScript 文件的路径。
Note that both function and arguments must be static and stringifiable, as they need to be sent to the worker in a serialized form. In case of large functions or function arguments, the overhead of sending the data to the worker can be significant.
我正在尝试使用 TypeScript 进行这项工作。解决此问题的可能方法是:
- 在 TypeScript 中编写一个辅助函数,使用任何捆绑器将其编译为单独的捆绑包,然后将编译文件的路径传递给
pool.exec
。我设法完成了这项工作,但我唯一不满意的是,使用此解决方案你不能使用 nodemon(如果你使用它) - 使用 JS 包装器编译 TS 源代码并使用 ts-node 执行它。然后将该包装器的路径传递给
pool.exec
函数。此解决方案不适用于捆绑器