使用 Qpid Proton 写入 Qpid Java 代理的简单示例不起作用

Simple example writing to the Qpid Java broker with Qpid Proton not working

我是 AMQP 的新手,正在尝试编写一个简单的应用程序,使用 Qpid Proton Messenger API 写入 Qpid Java 代理。开箱即用的 Qpid Java 经纪人有四个默认交易所(amq.match、amq.fanout、amq.topic、amq.direct),一个 AMQP "port"端口 5672 和 passwordFile auth 提供程序。为了消除此测试的安全性,我将身份验证提供程序更改为匿名。

为了给经纪人写信,我遵循 this 示例。该示例没有显示如何写入特定的交换器或队列,我认为我的问题出在该域的某个地方。这是我的精简版。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <ctype.h>
#include <Windows.h>
#include "proton\message.h"
#include "proton\messenger.h"

int main(int argc, const char* argv[])
{
    while (true)
    {
        pn_message_t * message;
        pn_messenger_t * messenger;
        pn_data_t * body;

        message = pn_message();
        messenger = pn_messenger(NULL);

        pn_messenger_start(messenger);

        printf("set address result: %i\n", pn_message_set_address(message, "amqp://xxx.xxx.xxx.xxx:5672"));

        body = pn_message_body(message);

        char* msgtext = "howdy";
        pn_data_put_string(body, pn_bytes(strlen(msgtext), msgtext));
        pn_messenger_put(messenger, message);

        printf("\nSent: %i\n", pn_messenger_send(messenger, 1));

        pn_messenger_stop(messenger);
        pn_messenger_free(messenger);
        pn_message_free(message);

        Sleep(10);
    }
}

pn_messenger_send returns 0(成功)。从客户端机器上我可以看到客户端正在通过线路发送消息。但是,在代理管理门户中,它显示零客户端连接,默认虚拟主机显示 0 msg/s (0.00 B/s) 入站。我希望这能显示来自我的测试客户端的字节。如果我 运行 客户端与代理关闭或将其指向错误的端口,客户端将在 pn_messenger_send 调用中失败,所以我知道我至少在与代理交谈。

我的问题是这些消息要去哪里?如何在消息的连接字符串中定义交换类型和队列?我找了又找,什么也没找到。欢迎任何我可能错过的文档或教程链接。

这里有一些代理配置截图供参考。

谢谢!

如果您尝试将消息发送到特定队列,您的地址需要采用以下格式:

amqp://host:port/test-queue-1

您还可以向交易所发送消息:

amqp://host:port/test-exchange-1

在您的情况下,您发送的邮件没有在地址中指定目的地。所以消息会转到默认的直接交换。所有队列都绑定到默认的direct,队列名作为绑定键。但是您也没有在消息上设置路由键。它们很可能因为不可路由而被丢弃——我不记得 qpid 是否有死信队列实现。

编辑:Unrouteable messages could be discarded

为了在您的消息中指定路由键,请使用:

pn_message_set_subject(message, "my-routing-key");

如果您考虑了自定义交换和队列设置,则必须预定义交换和队列绑定以确保正确路由消息。我想象存在 c-bindings 以编程方式定义和配置交换,但我不熟悉它。

您可以在命令行上使用 qpid-config 命令创建交换器、队列和绑定,或者通过 webadmin 应用程序使用 REST API 例如:

使用 qpid-config 创建持久队列:

qpid-config add queue test-queue-1 --durable 

或 REST API

#create a durable queue
curl -X PUT  -d '{"durable":true}' http://localhost:8080/rest/queue/<vhostname>/test-queue-1

创建交易所

qpid-config add exchange direct test-exchange-1 --durable

正在创建队列和交换器之间的绑定:

qpid-config bind test-exchange-1 test-queue-1 test-queue-binding-key

您使用路由键 test-queue-binding-key 发送到交换器的任何消息最终都会在 test-queue-1 中结束。

参考:qpid-config
参考:REST API