为什么我的 channel.basicConsume 不等待消息
How comes my channel.basicConsume does not wait for messages
每当我启动以下代码时:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "direct_logs";
channel.exchangeDeclare(exchangeName, "direct");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, "red");
channel.basicQos(1);
final Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException{
String message = new String(body, "UTF-8");
System.out.println(message);
System.out.println("message received");
}
};
channel.basicConsume(queueName, true, consumer);
它不会像文档中暗示的那样开始无限循环。相反,它会立即停止。
我可以让它消耗一段时间的唯一方法是用循环替换 channel.basicConsume
,如下所示:
DateTime startedAt = new DateTime();
DateTime stopAt = startedAt.plusSeconds(60);
long i=0;
try {
while (stopAt.compareTo(new DateTime()) > 0) {
channel.basicConsume(queueName, true, consumer);
i++;
}
}finally {
System.out.println(new DateTime());
System.out.println(startedAt);
System.out.println(stopAt);
System.out.println(i);
}
一定有更好的方法来听一会儿消息,对吗?我错过了什么?
它会立即停止收听。
你确定它停止了吗? basicConsume
所做的是注册一个消费者来监听特定的队列,因此不需要在循环中执行它。你只执行一次,你传递的 Consumer
实例的 handleDelivery
方法将在消息到达时被调用。
rabbitmq 库创建的线程应防止 JVM 退出。为了退出程序,您实际上应该调用 connection.close()
这是来自 rabbitmq 的完整接收器示例:https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/Recv.java
其实和你的差不多。
我遇到了同样的问题。原因是我最后打电话给 connection.close。但是,basicConsume() 方法不会阻塞在当前线程上,而是阻塞在其他线程上,因此它后面的代码,即 connection.close() 会立即被调用。
每当我启动以下代码时:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "direct_logs";
channel.exchangeDeclare(exchangeName, "direct");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, "red");
channel.basicQos(1);
final Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException{
String message = new String(body, "UTF-8");
System.out.println(message);
System.out.println("message received");
}
};
channel.basicConsume(queueName, true, consumer);
它不会像文档中暗示的那样开始无限循环。相反,它会立即停止。
我可以让它消耗一段时间的唯一方法是用循环替换 channel.basicConsume
,如下所示:
DateTime startedAt = new DateTime();
DateTime stopAt = startedAt.plusSeconds(60);
long i=0;
try {
while (stopAt.compareTo(new DateTime()) > 0) {
channel.basicConsume(queueName, true, consumer);
i++;
}
}finally {
System.out.println(new DateTime());
System.out.println(startedAt);
System.out.println(stopAt);
System.out.println(i);
}
一定有更好的方法来听一会儿消息,对吗?我错过了什么? 它会立即停止收听。
你确定它停止了吗? basicConsume
所做的是注册一个消费者来监听特定的队列,因此不需要在循环中执行它。你只执行一次,你传递的 Consumer
实例的 handleDelivery
方法将在消息到达时被调用。
rabbitmq 库创建的线程应防止 JVM 退出。为了退出程序,您实际上应该调用 connection.close()
这是来自 rabbitmq 的完整接收器示例:https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/Recv.java
其实和你的差不多。
我遇到了同样的问题。原因是我最后打电话给 connection.close。但是,basicConsume() 方法不会阻塞在当前线程上,而是阻塞在其他线程上,因此它后面的代码,即 connection.close() 会立即被调用。