amqp_basic_qos 没有任何影响

amqp_basic_qos not having any effect

我正在尝试使用 librabbitmq 编写一个简单的消费者代码。它正在工作,但是当我执行 amqp_basic_consume 时,它会占用整个队列。 我想要的是让它得到一条消息,处理它并重复。

我尝试使用 basic_qos 让消费者一次预取 1,但这似乎根本没有效果。

基本设置和循环: // 一次设置 1 条消息的 qos 如果 (!amqp_basic_qos(conn, channel, 0, 1, 0)) { die_on_amqp_error(amqp_get_rpc_reply(conn), "basic.qos"); }

// Consuming the message
amqp_basic_consume(conn, channel, queue, amqp_empty_bytes, no_local, no_ack, exclusive, amqp_empty_table);

while (run) {
    amqp_rpc_reply_t result;
    amqp_envelope_t envelope;

    amqp_maybe_release_buffers(conn);
    result = amqp_consume_message(conn, &envelope, &timeout, 0);

    if (AMQP_RESPONSE_NORMAL == result.reply_type) {

        strncpy(message, envelope.message.body.bytes, envelope.message.body.len);
        message[envelope.message.body.len] = '[=11=]';

        printf("Received message size: %d\nbody: -%s-\n", (int) envelope.message.body.len, message );

        if ( strncmp(message, "DONE",4 ) == 0 )
        {
            printf("XXXXXXXXXXXXXXXXXX Cease message received. XXXXXXXXXXXXXXXXXXXXX\n");
            run = 0;
        }
        amqp_destroy_envelope(&envelope);
    }else{
         printf("Timeout.\n");
         run = 0;
    }
}

我希望队列已满,我可以开始处理,如果我按 ^C,剩余的消息仍在队列中。相反,即使我只处理了一条消息,整个队列都被清空了。

这是 noAck 为真时的行为。将会发生的情况是,消息将以代理发送消息的最快速度推送到连接的消费者,因为它假定消费者能够接受它们,因为 它们在传递后立即得到确认。

您可能希望将 noAck 更改为 false,然后在这种情况下明确地 ack 每条消息返回给代理。

或者,您可以使用 basic.get 一次从代理中提取一条消息,而不是使用基于推送的消费者(有些人不喜欢这个想法)。您的用例将确定什么是最合适的,但基于您似乎有一个完整的队列和相当密集的进程消息这一事实,我认为 basic.get 在这种情况下就可以了。接下来的问题是决定队列为空时轮询的频率。