Rabbitmq - 如何在交换中收听消息
Rabbitmq - how to listen to messages on an exchange
我在 Java 中有一个程序可以将消息发送到 RabbitMQ。我只知道交易所名称。没有队列、绑定等。
我的问题是:在只知道交易所名称的情况下,如何查看程序是否发送成功?
谢谢。
此致,
塞尔维亚语
请看这里:https://www.rabbitmq.com/tutorials/tutorial-three-java.html
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "EXCHANGE_NAME", "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException
{
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
简而言之,您必须:
- 创建一个队列,在本例中为匿名队列
- 将队列绑定到您的交换
了解您拥有哪种 exchange
很重要,因为绑定可以在 fanout
或 topic
或 direct
之间更改
在这个例子中是fanout
您可以使用 RabbitMQ 启用 publisher confirmation。这就像有一个发送交易,RabbitMQ 会告诉你消息是否发送成功。
假设我们有 RabbitMQ Exchange,我们需要创建一个队列来将消息推送到交换并从队列中使用它,如下所示
private static final String EXCHANGE_NAME = "2022";
private static final String QUEUE_NAME = "2022";
private final static boolean durable = true;
// now we need to create a connection to rabbitmq server //
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
factory.setHost("127.0.0.1");
factory.setPort(5672);
Connection conn = factory.newConnection();
// create rabbitmq connection chaneel
Channel channel = conn.createChannel();
//Declare Exchange //
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
// push message to rabbitmq exchange
channel.basicPublish(EXCHANGE_NAME, "routingkey", null, yourmessage.getBytes());
以上作为生产者工作现在我们需要创建队列消费者
private static final String EXCHANGE_NAME = "2022";
private static final String QUEUE_NAME = "2022";
private final static boolean durable = true;
// now we need to create a connection to rabbitmq server //
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
factory.setHost("127.0.0.1");
factory.setPort(5672);
Connection conn = factory.newConnection();
// create rabbitmq connection chaneel
Channel channel = conn.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
//Queue Declare //
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//Queue bind //
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routingkey");
// Queue Consume //
QueueingConsumer consumer = new QueueingConsumer(channel);
while (true)
{
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
}
我在 Java 中有一个程序可以将消息发送到 RabbitMQ。我只知道交易所名称。没有队列、绑定等。
我的问题是:在只知道交易所名称的情况下,如何查看程序是否发送成功?
谢谢。
此致, 塞尔维亚语
请看这里:https://www.rabbitmq.com/tutorials/tutorial-three-java.html
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "EXCHANGE_NAME", "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException
{
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
简而言之,您必须:
- 创建一个队列,在本例中为匿名队列
- 将队列绑定到您的交换
了解您拥有哪种 exchange
很重要,因为绑定可以在 fanout
或 topic
或 direct
在这个例子中是fanout
您可以使用 RabbitMQ 启用 publisher confirmation。这就像有一个发送交易,RabbitMQ 会告诉你消息是否发送成功。
假设我们有 RabbitMQ Exchange,我们需要创建一个队列来将消息推送到交换并从队列中使用它,如下所示
private static final String EXCHANGE_NAME = "2022"; private static final String QUEUE_NAME = "2022"; private final static boolean durable = true; // now we need to create a connection to rabbitmq server // ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/"); factory.setHost("127.0.0.1"); factory.setPort(5672); Connection conn = factory.newConnection(); // create rabbitmq connection chaneel Channel channel = conn.createChannel(); //Declare Exchange // channel.exchangeDeclare(EXCHANGE_NAME, "topic", true); // push message to rabbitmq exchange channel.basicPublish(EXCHANGE_NAME, "routingkey", null, yourmessage.getBytes());
以上作为生产者工作现在我们需要创建队列消费者
private static final String EXCHANGE_NAME = "2022"; private static final String QUEUE_NAME = "2022"; private final static boolean durable = true; // now we need to create a connection to rabbitmq server // ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/"); factory.setHost("127.0.0.1"); factory.setPort(5672); Connection conn = factory.newConnection(); // create rabbitmq connection chaneel Channel channel = conn.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic", true); //Queue Declare // channel.queueDeclare(QUEUE_NAME, true, false, false, null); //Queue bind // channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routingkey"); // Queue Consume // QueueingConsumer consumer = new QueueingConsumer(channel); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); }