Return一个作业静静地加入队列
Return a job to the queue silently
我有一个定义了作业队列的 RabbitMQ,我通过 Java 到 Spring 框架使用它。我知道,如果我在处理从队列中收到的作业时在代码中的某处抛出异常,那么 return 会将作业放入队列。但是,有没有其他方法可以 return 将作业 return 放入队列而不抛出异常,或者 return 将作业 "manually" 放入队列?
解决方案可能取决于您使用的抽象:
- Spring Cloud Streams 使用 Spring AMPQ
- Spring AMQP 使用 RabbitMQ Java 客户端
- RabbitMQ Java Client
Spring 云流
我会使用 Spring Cloud Stream Processor that processes messages and sets the routingKeyExpression。
绑定:
=> theSourceQueue
=> myProcessor(message) consumes messages and setts routing key as a header
=> DestinationExchange
=> route 1 => theSourceQueue
=> route 2 => ?
Spring AMQP
import com.rabbitmq.client.Channel;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class Receiver {
@RabbitListener(queues = "my-messages")
public void receiveMessage(String payload, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
System.out.println("Received message: `" + payload + "`, deliveryTag: " + deliveryTag);
channel.basicNack(deliveryTag, false, true);
}
}
RabbitMQ Java 客户端
你也可以到下层使用negative acknowledgement and re-queue:
This example rejects two messages with a single call to the broker (the second argument on basicNack is the multiple flag):
GetResponse gr1 = channel.basicGet("some.queue", false);
GetResponse gr2 = channel.basicGet("some.queue", false);
channel.basicNack(gr2.getEnvelope().getDeliveryTag(), true, true);
When a message is requeued, it will be placed to its original position in its queue, if possible. If not (due to concurrent deliveries and acknowledgements from other consumers when multiple consumers share a queue), the message will be requeued to a position closer to queue head.
我有一个定义了作业队列的 RabbitMQ,我通过 Java 到 Spring 框架使用它。我知道,如果我在处理从队列中收到的作业时在代码中的某处抛出异常,那么 return 会将作业放入队列。但是,有没有其他方法可以 return 将作业 return 放入队列而不抛出异常,或者 return 将作业 "manually" 放入队列?
解决方案可能取决于您使用的抽象:
- Spring Cloud Streams 使用 Spring AMPQ
- Spring AMQP 使用 RabbitMQ Java 客户端
- RabbitMQ Java Client
Spring 云流
我会使用 Spring Cloud Stream Processor that processes messages and sets the routingKeyExpression。
绑定:
=> theSourceQueue
=> myProcessor(message) consumes messages and setts routing key as a header
=> DestinationExchange
=> route 1 => theSourceQueue
=> route 2 => ?
Spring AMQP
import com.rabbitmq.client.Channel;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class Receiver {
@RabbitListener(queues = "my-messages")
public void receiveMessage(String payload, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
System.out.println("Received message: `" + payload + "`, deliveryTag: " + deliveryTag);
channel.basicNack(deliveryTag, false, true);
}
}
RabbitMQ Java 客户端
你也可以到下层使用negative acknowledgement and re-queue:
This example rejects two messages with a single call to the broker (the second argument on basicNack is the multiple flag):
GetResponse gr1 = channel.basicGet("some.queue", false); GetResponse gr2 = channel.basicGet("some.queue", false); channel.basicNack(gr2.getEnvelope().getDeliveryTag(), true, true);
When a message is requeued, it will be placed to its original position in its queue, if possible. If not (due to concurrent deliveries and acknowledgements from other consumers when multiple consumers share a queue), the message will be requeued to a position closer to queue head.