在使用 mocha 和 supertest 发出 POST 请求后,我无法测试检查 rabbitmq 队列

I'm having trouble with testing checking a rabbitmq queue after a POST request with mocha and supertest

我对 mocha 和 supertest 有点陌生。现在我正在测试在我发出 post 请求后检查 RabbitMQ 队列,该请求将所述消息发送到交换。现在我有这个代码:

var app= null;
//function to bind the queue to the exchange
function bindQueue(callback){
  console.log('BIND STARTED');  
  conn.addListener('ready', function() {
    var queueName = 'testQueue';
    var queueOptions = {durable : false, autoDelete: true};
    conn.queue(queueName,queueOptions,function(queue) {
      queue.bind('exchange', '#');
      queue.subscribe({ack: true},callback.bind(null,queue));  
    });
  });  
}


describe('Testing publish messages to the worker after change of data(Add,Update,Delete)', function() {
  app = 'http://'+config.auth.basic.username+':'+config.auth.basic.password+'@'+ config.ifc + ':' + config.port;

  it("should receive a message after adding data",function(done){  
    //queue binding
    bindQueue(function (queue,message){
      console.log('RECIEVED MESSAGE');
      if(message.response._id == 2){
        queue.shift(true,false); //take it off  from the queue 
        console.log('IS RIGHT');    
      } else {
          throw "not the right message";
      }
      done();
    });

    supertest(app)
      .post(config.apiPrefix+'/$test_1/user/2')
      .send({
        'role': ['Student'],
        'suffix': '',
        'firstname': 'Peter',
        'lastname': 'Doe',
        'dateOfBirth': '1980-03-30',
        'gender':'Male',
         'email': 'petter@hgmail.com',
         'createDate': '2014-10-25',
         'status': 'active'
      })
      .expect(200)
      .end();      
  });
  it("should receive a message after updating data",function(done){  
      //queue binding
    bindQueue(function (queue,message){
      if(message.response._id == 2){
        console.log('MESAGE RECIEVED 2');
        queue.shift(true,false); //take it off  from the queue 
        console.log('IS RIGHT');    
      } else {
          throw "not the right message";
      }
      done();
    });
    supertest(app)
      .put(config.apiPrefix+'/$test_1/user/2')
      .send({
          'middleinitial':'M'
      })
      .expect(200)
      .end();
    });
});

当我 运行 一个测试(例如只有 post 一个)时,一切正常,调用队列回调并验证消息 ID。 当我尝试像上面的代码那样进行多个测试时,问题就开始了。当我 运行 测试时,它显示以下错误:

Testing publish messages to the worker after change of data(Add,Update,Delete)
BIND STARTED
RECIEVED MESSAGE
IS RIGHT
    ✓ should receive a message after adding data (266ms)
BIND STARTED
    1) should receive a message after updating data
RECIEVED MESSAGE
IS RIGHT
    2) should receive a message after adding data


  1 passing (453ms)
  2 failing

  1) Testing publish messages to the worker after change of data(Add,Update,Delete) should receive a message after updating data:
     Uncaught TypeError: Cannot read property 'call' of undefined
      at _stream_readable.js:908:16

  2) Testing publish messages to the worker after change of data(Add,Update,Delete) should receive a message after adding data:
     Error: done() called multiple times
      at Suite.<anonymous> (test/7-test-message-publish.js:36:3)
      at Object.<anonymous> (test/7-test-message-publish.js:33:1)
      at Array.forEach (native)
      at node.js:814:3

我已经尝试了多种方法,但我运行别无选择。有谁知道可能是什么问题?

我认为你有相当于内存泄漏...在这种情况下,它确实是消费者泄漏。您正在创建消费者并且永远不会取消订阅他们。

每次调用 bindQueue(function(){....}) 时,您都在向消息队列添加另一个使用者。但是您永远不会从队列中解除绑定或删除侦听器。

如果您 运行 测试一次,它会起作用,因为只有 1 个消费者收到消息。但是如果你 运行 2 次测试,它会失败,因为你总共添加了 2 个消费者,并且无法保证哪个消费者会收到发送的消息。您可能正在将消息发送给老消费者,最终尝试调用不再有效的代码,因为该测试已经被拆除。

每次测试完成后,您应该取消订阅队列。


评论更新:

您似乎在使用 postwait/node-amqp 库,对吧?要取消订阅单个消费者,您必须这样做:

```js connection.queue('foo', 函数(队列) {

var ctag;

队列 .subscribe(function(msg) {...}) .addCallback(函数(确定){ ctag = ok.consumerTag; });

// ...以及其他一些回调 queue.unsubscribe(ctag); }); ```

请注意正在存储的 addCallbackctag,以便您稍后可以取消订阅该特定消费者。


p.s。一般的 rabbitmq 社区建议不要使用这个特定的库,因为它有泄漏通道的历史。他们现在可能已经解决了这个问题……但是此时大多数节点/rabbitmq 都是使用 amqplib (amqp.node) 或 bramqp 完成的。