Rabbitmq - 新消费者没有收到任何东西
Rabbitmq - new consumer not receiving anything
我正在使用 rabbitMQ 将任务发送给在 运行 上创建的工作人员(消费者)。目前,每次创建新任务时,都会创建一个新的工作人员。问题是这样的:
-一个用户创建了一个任务
-创建一个worker然后将任务发送到队列中供worker处理
-worker开始处理队列(worker基本休眠了一段时间)
-另一个用户创建了一个任务
-创建了新的工作线程并在队列中发送了任务
-新的worker不处理新任务,同时什么也不做,新任务由第一个worker完成第一个任务后处理
我检查了 rabbitmq 的管理部分,有两个消费者绑定到队列,但其中一个似乎完成了所有工作,而另一个只是等待。
这是工作人员的代码:
public class 工人扩展线程 {
private final static String QUEUE_NAME = "Tasks";
private final static String QUEUE_COMPL = "Completed";
public static int id = 0;
private static final String EXCHANGE_NAME = "logs";
public int compteur;
String identifier;
public Worker() {
Worker.id++;
compteur = id;
}
public void run() {
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("WORKER " + compteur + " : Message received :" + message);
String taskName = message.split(" ")[0];
String length = message.split(" ")[1];
try {
System.out.println("WORKER " + compteur + " : Commencing job :" + message);
doWork(length);
System.out.println("WORKER " + compteur + " : Job's finished :" + message);
taskName += " done by " + compteur;
// confirm(taskName);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
System.out.println("WORKER " + compteur + " : Waiting for a new task...");
}
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
} catch (IOException ex) {
Logger.getLogger(Worker.class.getName()).log(Level.SEVERE, null, ex);
} catch (TimeoutException ex) {
Logger.getLogger(Worker.class.getName()).log(Level.SEVERE, null, ex);
}
}
private static void doWork(String taskLength) throws InterruptedException {
int temps = Integer.parseInt(taskLength);
Thread.sleep(temps);
}
}
以及将消息放入队列的部分代码:
public class serverSD {
private final static String QUEUE_NAME = "Tasks";
private Manager MANAGER = new Manager();
@WebMethod(operationName = "processTask")
public String processTask(@WebParam(name = "message") String txt, @WebParam(name = "duree") int duree) throws IOException, TimeoutException {
if (MANAGER == null){
MANAGER= new Manager();
MANAGER.listen();
}
System.out.println("SERVER : Message received : " + txt + " " + duree);
MANAGER.giveTask();
ConnectionFactory factory = new ConnectionFactory();
String message = txt + " " + duree;
System.out.println("SERVER : Sending message to workers : " + message);
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
channel.close();
connection.close();
return "Your task is being processed";
}
}
(经理是 class 随时随地创建工作人员。)
如果有人问过类似的问题但我找不到,我很抱歉。
感谢您提供任何可能的帮助:)
好的,我似乎找到了让它工作的方法,问题是我不知道它为什么会工作,所以如果有人知道,我很乐意得到解释
我将 channel.basicConsume(QUEUE_NAME, true, consumer);
的第二个参数更改为 false,但我不确定是否理解它为什么能解决我的问题。
basicConsume
方法的第二个参数是"auto acknowledge"。将此参数设置为 true
意味着消费者将在收到消息后立即告诉 RabbitMQ 消息已被确认。
当消费者设置为 autoAck true 时,它很可能会立即从队列中接收下一条可用消息,即使 basicQos
设置为 1。这种情况会发生,因为 1 限制立即被消费者递减,表示它不再有任何消息并且可以接受下一条消息。
将自动确认参数更改为 false
与基本 QoS 设置 1 结合使用可防止出现此问题,因为它会强制您的消费者说 "hey, i've got a message and i'm currently working on it. don't send me anything else until i'm done."
这允许其他消费者说 "hey, i have a spot open. go ahead and send me the message"
我正在使用 rabbitMQ 将任务发送给在 运行 上创建的工作人员(消费者)。目前,每次创建新任务时,都会创建一个新的工作人员。问题是这样的: -一个用户创建了一个任务
-创建一个worker然后将任务发送到队列中供worker处理
-worker开始处理队列(worker基本休眠了一段时间)
-另一个用户创建了一个任务
-创建了新的工作线程并在队列中发送了任务
-新的worker不处理新任务,同时什么也不做,新任务由第一个worker完成第一个任务后处理
我检查了 rabbitmq 的管理部分,有两个消费者绑定到队列,但其中一个似乎完成了所有工作,而另一个只是等待。
这是工作人员的代码: public class 工人扩展线程 {
private final static String QUEUE_NAME = "Tasks";
private final static String QUEUE_COMPL = "Completed";
public static int id = 0;
private static final String EXCHANGE_NAME = "logs";
public int compteur;
String identifier;
public Worker() {
Worker.id++;
compteur = id;
}
public void run() {
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("WORKER " + compteur + " : Message received :" + message);
String taskName = message.split(" ")[0];
String length = message.split(" ")[1];
try {
System.out.println("WORKER " + compteur + " : Commencing job :" + message);
doWork(length);
System.out.println("WORKER " + compteur + " : Job's finished :" + message);
taskName += " done by " + compteur;
// confirm(taskName);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
System.out.println("WORKER " + compteur + " : Waiting for a new task...");
}
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
} catch (IOException ex) {
Logger.getLogger(Worker.class.getName()).log(Level.SEVERE, null, ex);
} catch (TimeoutException ex) {
Logger.getLogger(Worker.class.getName()).log(Level.SEVERE, null, ex);
}
}
private static void doWork(String taskLength) throws InterruptedException {
int temps = Integer.parseInt(taskLength);
Thread.sleep(temps);
}
}
以及将消息放入队列的部分代码: public class serverSD {
private final static String QUEUE_NAME = "Tasks";
private Manager MANAGER = new Manager();
@WebMethod(operationName = "processTask")
public String processTask(@WebParam(name = "message") String txt, @WebParam(name = "duree") int duree) throws IOException, TimeoutException {
if (MANAGER == null){
MANAGER= new Manager();
MANAGER.listen();
}
System.out.println("SERVER : Message received : " + txt + " " + duree);
MANAGER.giveTask();
ConnectionFactory factory = new ConnectionFactory();
String message = txt + " " + duree;
System.out.println("SERVER : Sending message to workers : " + message);
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
channel.close();
connection.close();
return "Your task is being processed";
}
}
(经理是 class 随时随地创建工作人员。)
如果有人问过类似的问题但我找不到,我很抱歉。 感谢您提供任何可能的帮助:)
好的,我似乎找到了让它工作的方法,问题是我不知道它为什么会工作,所以如果有人知道,我很乐意得到解释
我将 channel.basicConsume(QUEUE_NAME, true, consumer);
的第二个参数更改为 false,但我不确定是否理解它为什么能解决我的问题。
basicConsume
方法的第二个参数是"auto acknowledge"。将此参数设置为 true
意味着消费者将在收到消息后立即告诉 RabbitMQ 消息已被确认。
当消费者设置为 autoAck true 时,它很可能会立即从队列中接收下一条可用消息,即使 basicQos
设置为 1。这种情况会发生,因为 1 限制立即被消费者递减,表示它不再有任何消息并且可以接受下一条消息。
将自动确认参数更改为 false
与基本 QoS 设置 1 结合使用可防止出现此问题,因为它会强制您的消费者说 "hey, i've got a message and i'm currently working on it. don't send me anything else until i'm done."
这允许其他消费者说 "hey, i have a spot open. go ahead and send me the message"