nodejs async.queue 从其他方法调用回调

nodejs async.queue call callback from other method

是否可以从 worker 函数以外的另一个函数调用作业的 finish 回调?因为我想做的是:

  1. 我收到一个 POST 请求,该请求被推送到 async.queue
  2. 提供给 async.queue 的 worker 函数发送然后通过 sockets 发送一些数据。
  3. 请求完成,当服务器也通过套接字收到回复时

以下是我的设置:

io.on('connection', function(client) {
    client.setMaxListeners(0);

    client.on('answer', (data) => {
        console.log(data);
        console.log(request_queue.workersList());

        returnResponse(data, request_queue.workersList()[0].data, request_queue.workersList()[0].callback);
    })
});

var request_queue = asyn.queue(requestHandler, 1);

app.post('/', function(request, response) {

    request_queue.push({req: request, res: response}, () => {
        console.log("pushed new request");
    });

    console.log(request_queue.length());

});

function requestHandler(req_res, finish) {

    // Do something with the data received in the request

    io.sockets.emit('update_img', JSON.stringify(data_to_send));

}

function returnResponse(data, res_req, finish) {
    data = JSON.parse(data);
    var res = res_req.res;

    var error = data.error;

    if (error) {
        console.log("I WAS HERE");
        error = {};
        error['error'] = 'There has been an error!'
        res.json(error);
        finish("There was an error");
    }

    // send back actual response

    res.json(data);
    console.log("Right before finish");
    finish();
}

不幸的是,这会产生以下错误:

GET / 304 3.845 ms - -
1
[Function]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
DATA_RECEIVED
[ { data: { req: [Object], res: [Object] },
    callback: [Function],
    next: null,
    prev: null } ]
Right before finish
pushed new request
POST / 200 7901.287 ms - 267
16
17
18
19
20
21
22
23
24
DATA_RECEIVED
[ { data: { req: [Object], res: [Object] },
    callback: [Function],
    next: null,
    prev: null } ]
_http_outgoing.js:489
    throw new Error('Can\'t set headers after they are sent.');
    ^

所以在我看来,worker 函数没有正确完成,否则响应对象会是不同的,对吗?我不确定我是否通过 request_queue.workersList()[0].callback.

正确传递了 worker 的回调函数

谁能帮帮我?

一定有更简单的方法来做到这一点。让我们添加一个全局回调映射,我们可以为每个进程存储一个回调。为此,我们需要通过共享的唯一 ID 来识别 socket/POST:

 const callbacks = new Map();

 io.on('connection', function(client) {
  client.setMaxListeners(0);

  client.on('answer', (data) => {
    console.log(data);     
    const cb = callbacks.get(data.id /*TODO*/);
    if(!cb) return client.emit("error","process with that id not found");
    cb(data);
    callbacks.delete(data.id);
  })
});

所以现在我们可以向只会调用一次的套接字添加回调。可以这样使用:

 app.post("/:id", function(req, res){
   callbacks.set( req.params.id, data => {
     res.json(data);
   });
});

如果您想在一定时间后终止 POST 请求(以防止 DOS 攻击),您可以使用简单的超时:

app.post("/:id", function(req, res){
   callbacks.set( req.params.id, data => {
     res.json(data);
   });
   //recycle after inactive time:
   setTimeout( id => {
     if( !callbacks.has(id) ) return; //all fine, user answered already
     callbacks.delete(id);
     res.json({ error: "no answer from socket"});
  }, 10000, req.params.id);
});