Node JS:将响应对象传递给服务器端事件的 Bull 队列
Node JS: Passing Response Object to Bull Queue for Server Side Events
我陷入了架构决策。我有 Node + Express 应用程序,它有一个 API 来上传文件。上传完成后,响应关闭,上传的文件在 Bull Queue + Redis 的帮助下由 FFMPEG 批量处理。这种结构工作正常,但最近我开始测试服务器端事件,以向最终用户提供有关处理的更新。但是我无法将响应对象传递给 Bull Queue 以从服务器向用户写入定期更新。
1.进口
import childProcess from 'child_process';
import Bull from 'bull'
const Queue = new Bull('background_job', {redis: {port: process.env.port, host: process.env.host, password: process.env.password}});
2。上传功能
const uploadVideo = async(req, res) => {
try{
const result = await authUser(req);
const result2 = await checkUploadFile(result);
const result3 = await insertPost(result2, res);
await Queue.add(result3.data, result3.opts)
} catch(err){
res.status(403).send(err);
}
}
3。承诺
const authUser = (req) => {
return new Promise((resolve, reject) => {
//do some work
})
}
const checkUploadFile = (result) => {
return new Promise((resolve, reject) => {
//do some more work
})
}
const insertPost= (result, res) => {
return new Promise((resolve, reject) => {
//do final work
...........
//preparing server side events
const headers = {
'Content-Type': 'text/event-stream',
'Connection': 'keep-alive',
'Cache-Control': 'no-cache',
'Access-Control-Allow-Origin': '*'
};
res.writeHead(200, headers);
res.write(JSON.stringify({status: true, id: 1})); //testing server side events for the first time
//Let's continue to Bull
const data = {res: res} <- error here: TypeError: Converting circular structure to JSON
const opts = {removeOnComplete: true, removeOnFail: true}
resolve({data: data, opts: opts});
})
}
4.队列进程
Queue.process((job, done) => {
const res = job.data.res
childProcess.execFile('someScript.sh', [`some`, `arguments`], { stdio: ['pipe', 'pipe', 'ignore']}, (err, stderr, stdout) => {
if(err){
done(new Error("Failed: " + err))
res.write(JSON.stringify({status: true, id: 2})); //here using SSE
res.end()
} else {
done()
res.write(JSON.stringify({status: false})); //here using SSE
res.end()
}
})
})
5. PM2
记录的错误
TypeError: Converting circular structure to JSON
--> starting at object with constructor 'Socket'
| property 'parser' -> object with constructor 'HTTPParser'
--- property 'socket' closes the circle
我尝试使用 JSON.stringify(res)
将响应对象作为 JSON 传递,但这也不起作用。现在我正在考虑这种方法是否正确,或者我应该使用 Socket.io (这对于简单的服务器端事件来说是一种矫枉过正)
谢谢
你为什么要写这行:
const data = {res: res} <- error here: TypeError: Converting circular structure to JSON.
您仍然可以访问调用 insertPost 的 uploadVideo 函数中的响应对象。所以它可以简单地是:
await Queue.add(res, result3.opts).
例如:
const uploadVideo = async(req, res) => {
try{
const result = await authUser(req);
const result2 = await checkUploadFile(result);
const result3 = await insertPost(result2, res);
await Queue.add(res, result3.opts); // still have access to res
} catch(err){
res.status(403).send(err);
}
删除这一行:
const data = {res: res} <- error here: TypeError: Converting circular structure to JSON
只需使用响应
Queue.process((res, done) => {
//const res = job.data.res
childProcess.execFile('someScript.sh', [`some`, `arguments`], { stdio: ['pipe', 'pipe', 'ignore']}, (err, stderr, stdout) => {
if(err){
done(new Error("Failed: " + err))
res.write(JSON.stringify({status: true, id: 2})); //here using SSE
res.end()
} else {
done()
res.write(JSON.stringify({status: false})); //here using SSE
res.end()
}
})
});
编辑:
我明白你的意思了。看了一下 bull 模块。为什么你不能做这样的事情。
const uploadVideo = async(req, res) => {
try{
res.jobId = 0; // we need a way to know if job completed is our request const result = await authUser(req);
const result2 = await checkUploadFile(result);
const result3 = await insertPost(result2, res);
Queue.add({id: res.jobId, somedatafromresult3: 'result3.somedata' }, result3.opts);
Queue.on("completed", (err, data) => {
if (data.id === res.jobId) { // check to see if completed job is our one.
res.write(JSON.stringify(data)); //here using SSE
res.end()
}
console.log(data);
});
} catch(err){
res.status(403).send(err);
}
}
然后在您的处理函数中,只需 return 将要发出的数据。即
videoQueue.process(function(job, done){
childProcess.execFile('someScript.sh', [`some`, `arguments`], { stdio: ['pipe', 'pipe', 'ignore']}, (err, stderr, stdout) => {
if(err){
done(err, {status: true, id: job.data.id});
} else {
done(null, {status: false, id: job.data.id});
}
})
})
;
您可以使用 job.progress()
与通过 SSE 连接到客户端的路由进行通信。使用 job.progress(percent)
更新进度,传入一个数字。然后,Express 路由范围可以在此基础上旋转,并随着作业的进行向客户端发出 SSE 事件。
这是一个基本的 运行 可用示例作为概念证明,您可以将处理、错误处理和 job.progress
以及 SSE 逻辑添加到其中。
const express = require("express");
const fs = require("fs").promises;
const path = require("path");
const Queue = require("bull");
const sleep = (ms=1000) =>
new Promise(resolve => setTimeout(resolve, ms))
;
const queue = new Queue("test", process.env.REDIS_URL);
queue.process(4, async job => {
for (let i = 1; i <= job.data.seconds; i++) {
await job.progress(i / job.data.seconds * 100 | 0);
await sleep();
}
return Promise.resolve(`job ${job.id} complete!`);
});
const app = express();
app
.set("port", process.env.PORT || 5000)
.get("/", async (req, res) => {
try {
res.set({
"Access-Control-Allow-Origin": "*",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Content-Type": "text/event-stream",
});
res.flushHeaders();
const job = await queue.add({
seconds: Math.abs(+req.query.seconds) || 10,
});
let connected = true;
res.on("close", () => {
connected = false;
});
for (; connected; await sleep()) {
const j = await queue.getJob(job.id);
const progress = await j.progress();
res.write(`${progress}\n`);
if (progress >= 100) { // TODO handle job errors
break;
}
}
res.write(await job.finished());
}
catch (err) {
res.write(err.message);
}
finally {
res.end();
}
})
.listen(app.get("port"), () =>
console.log(`server listening on port ${app.get("port")}`)
)
;
样本运行:
$ curl localhost:5000
0
10
20
30
40
50
60
70
80
90
100
job 64 complete!
另请参阅 ,其中有一个可以读取响应流的示例客户端。
我陷入了架构决策。我有 Node + Express 应用程序,它有一个 API 来上传文件。上传完成后,响应关闭,上传的文件在 Bull Queue + Redis 的帮助下由 FFMPEG 批量处理。这种结构工作正常,但最近我开始测试服务器端事件,以向最终用户提供有关处理的更新。但是我无法将响应对象传递给 Bull Queue 以从服务器向用户写入定期更新。
1.进口
import childProcess from 'child_process';
import Bull from 'bull'
const Queue = new Bull('background_job', {redis: {port: process.env.port, host: process.env.host, password: process.env.password}});
2。上传功能
const uploadVideo = async(req, res) => {
try{
const result = await authUser(req);
const result2 = await checkUploadFile(result);
const result3 = await insertPost(result2, res);
await Queue.add(result3.data, result3.opts)
} catch(err){
res.status(403).send(err);
}
}
3。承诺
const authUser = (req) => {
return new Promise((resolve, reject) => {
//do some work
})
}
const checkUploadFile = (result) => {
return new Promise((resolve, reject) => {
//do some more work
})
}
const insertPost= (result, res) => {
return new Promise((resolve, reject) => {
//do final work
...........
//preparing server side events
const headers = {
'Content-Type': 'text/event-stream',
'Connection': 'keep-alive',
'Cache-Control': 'no-cache',
'Access-Control-Allow-Origin': '*'
};
res.writeHead(200, headers);
res.write(JSON.stringify({status: true, id: 1})); //testing server side events for the first time
//Let's continue to Bull
const data = {res: res} <- error here: TypeError: Converting circular structure to JSON
const opts = {removeOnComplete: true, removeOnFail: true}
resolve({data: data, opts: opts});
})
}
4.队列进程
Queue.process((job, done) => {
const res = job.data.res
childProcess.execFile('someScript.sh', [`some`, `arguments`], { stdio: ['pipe', 'pipe', 'ignore']}, (err, stderr, stdout) => {
if(err){
done(new Error("Failed: " + err))
res.write(JSON.stringify({status: true, id: 2})); //here using SSE
res.end()
} else {
done()
res.write(JSON.stringify({status: false})); //here using SSE
res.end()
}
})
})
5. PM2
记录的错误TypeError: Converting circular structure to JSON
--> starting at object with constructor 'Socket'
| property 'parser' -> object with constructor 'HTTPParser'
--- property 'socket' closes the circle
我尝试使用 JSON.stringify(res)
将响应对象作为 JSON 传递,但这也不起作用。现在我正在考虑这种方法是否正确,或者我应该使用 Socket.io (这对于简单的服务器端事件来说是一种矫枉过正)
谢谢
你为什么要写这行:
const data = {res: res} <- error here: TypeError: Converting circular structure to JSON.
您仍然可以访问调用 insertPost 的 uploadVideo 函数中的响应对象。所以它可以简单地是:
await Queue.add(res, result3.opts).
例如:
const uploadVideo = async(req, res) => {
try{
const result = await authUser(req);
const result2 = await checkUploadFile(result);
const result3 = await insertPost(result2, res);
await Queue.add(res, result3.opts); // still have access to res
} catch(err){
res.status(403).send(err);
}
删除这一行:
const data = {res: res} <- error here: TypeError: Converting circular structure to JSON
只需使用响应
Queue.process((res, done) => {
//const res = job.data.res
childProcess.execFile('someScript.sh', [`some`, `arguments`], { stdio: ['pipe', 'pipe', 'ignore']}, (err, stderr, stdout) => {
if(err){
done(new Error("Failed: " + err))
res.write(JSON.stringify({status: true, id: 2})); //here using SSE
res.end()
} else {
done()
res.write(JSON.stringify({status: false})); //here using SSE
res.end()
}
})
});
编辑:
我明白你的意思了。看了一下 bull 模块。为什么你不能做这样的事情。
const uploadVideo = async(req, res) => {
try{
res.jobId = 0; // we need a way to know if job completed is our request const result = await authUser(req);
const result2 = await checkUploadFile(result);
const result3 = await insertPost(result2, res);
Queue.add({id: res.jobId, somedatafromresult3: 'result3.somedata' }, result3.opts);
Queue.on("completed", (err, data) => {
if (data.id === res.jobId) { // check to see if completed job is our one.
res.write(JSON.stringify(data)); //here using SSE
res.end()
}
console.log(data);
});
} catch(err){
res.status(403).send(err);
}
}
然后在您的处理函数中,只需 return 将要发出的数据。即
videoQueue.process(function(job, done){
childProcess.execFile('someScript.sh', [`some`, `arguments`], { stdio: ['pipe', 'pipe', 'ignore']}, (err, stderr, stdout) => {
if(err){
done(err, {status: true, id: job.data.id});
} else {
done(null, {status: false, id: job.data.id});
}
})
})
;
您可以使用 job.progress()
与通过 SSE 连接到客户端的路由进行通信。使用 job.progress(percent)
更新进度,传入一个数字。然后,Express 路由范围可以在此基础上旋转,并随着作业的进行向客户端发出 SSE 事件。
这是一个基本的 运行 可用示例作为概念证明,您可以将处理、错误处理和 job.progress
以及 SSE 逻辑添加到其中。
const express = require("express");
const fs = require("fs").promises;
const path = require("path");
const Queue = require("bull");
const sleep = (ms=1000) =>
new Promise(resolve => setTimeout(resolve, ms))
;
const queue = new Queue("test", process.env.REDIS_URL);
queue.process(4, async job => {
for (let i = 1; i <= job.data.seconds; i++) {
await job.progress(i / job.data.seconds * 100 | 0);
await sleep();
}
return Promise.resolve(`job ${job.id} complete!`);
});
const app = express();
app
.set("port", process.env.PORT || 5000)
.get("/", async (req, res) => {
try {
res.set({
"Access-Control-Allow-Origin": "*",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Content-Type": "text/event-stream",
});
res.flushHeaders();
const job = await queue.add({
seconds: Math.abs(+req.query.seconds) || 10,
});
let connected = true;
res.on("close", () => {
connected = false;
});
for (; connected; await sleep()) {
const j = await queue.getJob(job.id);
const progress = await j.progress();
res.write(`${progress}\n`);
if (progress >= 100) { // TODO handle job errors
break;
}
}
res.write(await job.finished());
}
catch (err) {
res.write(err.message);
}
finally {
res.end();
}
})
.listen(app.get("port"), () =>
console.log(`server listening on port ${app.get("port")}`)
)
;
样本运行:
$ curl localhost:5000
0
10
20
30
40
50
60
70
80
90
100
job 64 complete!
另请参阅