[RabbitMQ][AMQP] 无法获取和读取 amqp_basic_get 和 amqp_read_message 的单个消息

[RabbitMQ][AMQP] Failing to get and read single message with amqp_basic_get and amqp_read_message

我想用 amqp 设置一个消费者从特定队列中读取。一些谷歌搜索指出这可以用 amqp_basic_get 完成,并查看文档,实际消息是用 amqp_read_message 检索的。我还发现了这个 example,我试图遵循它来实现 basic_get。尽管如此,我无法从特定队列获取和读取消息。

我的场景是这样的:我有两个程序通过从rabbitmq 服务器发布和消费来进行通信。在每一个中,都声明了一个连接,有两个通道,一个用于消费,一个用于发布。信息流是这样的:程序A获取当前时间,发布到rabbitmq。程序B收到这条消息后,获取自己的时间,将自己的时间和接收到的时间打包成一条消息发布给rabbitmq。程序 A 应该消费这条消息。但是,我无法从 namedQueue 中读取成功。

程序A(c++,使用amqp.c)实现如下:

                        ... after creating the connection
        //Create channels
        amqp_channel_open_ok_t *res = amqp_channel_open(conn, channelIDPub);
        assert(res != NULL);
        amqp_channel_open_ok_t *res2 = amqp_channel_open(conn, channelIDSub);
        assert(res2 != NULL);
    
        //Declare exchange
        exchange = "exchangeName";
        exchangetype = "direct";
        amqp_exchange_declare(conn, channelIDPub, amqp_cstring_bytes(exchange.c_str()),
                              amqp_cstring_bytes(exchangetype.c_str()), 0, 0, 0, 0,
                              amqp_empty_table);
                                          ...
        throw_on_amqp_error(amqp_get_rpc_reply(conn), printText.c_str());
    
        //Bind the exchange to the queue
        const char* qname = "namedQueue";
        amqp_bytes_t queue = amqp_bytes_malloc_dup(amqp_cstring_bytes(qname));
        amqp_queue_declare_ok_t *r = amqp_queue_declare(
                conn, channelIDSub, queue, 0, 0, 0, 0, amqp_empty_table);
        throw_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");
        
        if (queue.bytes == NULL) {
            fprintf(stderr, "Out of memory while copying queue name");
            return;
        }
    
        amqp_queue_bind(conn, channelIDSub, queue, amqp_cstring_bytes(exchange.c_str()),
                        amqp_cstring_bytes(queueBindingKey.c_str()), amqp_empty_table);
        throw_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue");
    
        amqp_basic_consume(conn, channelIDSub, queue, amqp_empty_bytes, 0, 0, 1,
                           amqp_empty_table);
        throw_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");
    
                                           // ...
        // In order to get a message from rabbitmq
        amqp_rpc_reply_t res, res2;
        amqp_message_t message;
        amqp_boolean_t no_ack = false;
    
        amqp_maybe_release_buffers(conn);
        printf("were here, with queue name %s, on channel %d\n", queueName, channelIDSub);
    
        amqp_time_t deadline;
        struct timeval timeout = { 1 , 0 };//same timeout used in consume(json)
        int time_rc = amqp_time_from_now(&deadline, &timeout);
        assert(time_rc == AMQP_STATUS_OK);
    
        do {
            res = amqp_basic_get(conn, channelIDSub, amqp_cstring_bytes("namedQueue"), no_ack);
        } while (res.reply_type == AMQP_RESPONSE_NORMAL &&
                res.reply.id == AMQP_BASIC_GET_EMPTY_METHOD 
                && amqp_time_has_past(deadline) == AMQP_STATUS_OK);
    
        if (AMQP_RESPONSE_NORMAL != res.reply_type || AMQP_BASIC_GET_OK_METHOD != res.reply.id)
        {
            printf("amqp_basic_get error codes amqp_response_normal %d, amqp_basic_get_ok_method %d\n", res.reply_type, res.reply.id);
            return false;
        }
    
        res2 = amqp_read_message(conn,channelID,&message,0);
        printf("error %s\n", amqp_error_string2(res2.library_error));
        printf("5:reply type %d\n", res2.reply_type);
    
        if (AMQP_RESPONSE_NORMAL != res2.reply_type) {
            printf("6:reply type %d\n", res2.reply_type);
            return false;
        }
    
        payload = std::string(reinterpret_cast< char const * >(message.body.bytes), message.body.len);
    
        printf("then were here\n %s", payload.c_str());
        amqp_destroy_message(&message);

程序B(在python)如下

        #!/usr/bin/env python3
        import pika
        import json
        from datetime import datetime, timezone
        import time
        import threading
    
        cosimTime = 0.0
        newData = False
        lock = threading.Lock()
        thread_stop = False
    
        connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        connectionPublish = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        channelConsume = connection.channel()
        channelPublish = connectionPublish.channel()
    
        print("Declaring exchange")
        channelConsume.exchange_declare(exchange='exchangeName', exchange_type='direct')
        channelPublish.exchange_declare(exchange='exchangeName', exchange_type='direct')
    
        print("Creating queue")
        result = channelConsume.queue_declare(queue='', exclusive=True)
        queue_name = result.method.queue
        result2 = channelPublish.queue_declare(queue='namedQueue', exclusive=False, auto_delete=False)
    
        channelConsume.queue_bind(exchange='exchangeName', queue=queue_name,
                       routing_key='fromB')
    
        channelPublish.queue_bind(exchange='exchangeName', queue="namedQueue",
                       routing_key='toB')
    
        print(' [*] Waiting for logs. To exit press CTRL+C')
    
    def callbackConsume(ch, method, properties, body):
        global newData, cosimTime
        print("\nReceived [x] %r" % body)
        #cosimTime = datetime.datetime.strptime(body, "%Y-%m-%dT%H:%M:%S.%f%z")
        with lock:
            newData = True
            cosimTime = body.decode()
            cosimTime = json.loads(cosimTime)
            #print(cosimTime)
    
    def publishRtime():
        global newData
        while not thread_stop:
            if newData:
            #if True:
                with lock:
                    newData = False
                    msg = {}
                    
                    msg['rtime'] = datetime.now(timezone.utc).astimezone().isoformat(timespec='milliseconds')
    
                    msg['cosimtime'] = cosimTime["simAtTime"]
                    print("\nSending [y] %s" % str(msg))
                    channelPublish.basic_publish(exchange='exchangeName',
                                        routing_key='toB',
                                        body=json.dumps(msg))
                    #time.sleep(1)
    
    channelConsume.basic_consume(
        queue=queue_name, on_message_callback=callbackConsume, auto_ack=True)
    
    try:
        thread = threading.Thread(target = publishRtime)
        thread.start()
        channelConsume.start_consuming()
    except KeyboardInterrupt:
        print("Exiting...")
        channelConsume.stop_consuming()
        thread_stop = True
        connection.close()

程序A输出的是:

amqp_basic_get error codes amqp_response_normal 1, amqp_basic_get_ok_method 3932232

这是 AMQP_BASIC_GET_EMPTY_METHOD 的代码。

程序B获取数据,持续发布

如果我稍微修改 B 以始终发布一个特定的字符串,那么 amqp_basic_get returns 似乎成功了,但是在 amqp_read_message 处失败了代码AMQP_RESPONSE_LIBRARY_EXCEPTION.

知道如何让它工作吗,我缺少什么设置?

问题出在 queue_declare,其中 auto_delete 参数在两侧不匹配。