超时后重新发送消息
Resend messages after timeout
我有一个我放入 Spring AMQP 中的对象列表。对象来自控制器。有一个服务可以处理这些对象。此服务可能因 OutOfMemoryException 而崩溃。因此,我运行应用程序的几个实例。
有一个问题:当服务崩溃时,我丢失了收到的消息。我读到了 NACK。并且可以在 Exception 或 RuntimeException 的情况下使用它。但是我的服务因错误而崩溃。因此,我不能发送 NACK。可否在AMQP中设置一个超时时间,超时后如果我没有确认之前到达的消息会再次发送消息?
这是我写的代码:
public class Exchanges {
public static final String EXC_RENDER_NAME = "render.exchange.topic";
public static final TopicExchange EXC_RENDER = new TopicExchange(EXC_RENDER_NAME, true, false);
}
public class Queues {
public static final String RENDER_NAME = "render.queue.topic";
public static final Queue RENDER = new Queue(RENDER_NAME);
}
@RequiredArgsConstructor
@Service
public class RenderRabbitEventListener extends RabbitEventListener {
private final ApplicationEventPublisher eventPublisher;
@RabbitListener(bindings = @QueueBinding(value = @Queue(Queues.RENDER_NAME),
exchange = @Exchange(value = Exchanges.EXC_RENDER_NAME, type = "topic"),
key = "render.#")
)
public void onMessage(Message message, Channel channel) {
String routingKey = parseRoutingKey(message);
log.debug(String.format("Event %s", routingKey));
RenderQueueObject queueObject = parseRender(message, RenderQueueObject.class);
handleMessage(queueObject);
}
public void handleMessage(RenderQueueObject render) {
GenericSpringEvent<RenderQueueObject> springEvent = new GenericSpringEvent<>(render);
springEvent.setRender(true);
eventPublisher.publishEvent(springEvent);
}
}
这是发送消息的方法:
@Async ("threadPoolTaskExecutor")
@EventListener (condition = "# event.queue")
public void start (GenericSpringEvent <RenderQueueObject> event) {
RenderQueueObject renderQueueObject = event.getWhat ();
send (RENDER_NAME, renderQueueObject);
}
private void send(String routingKey, Object queue) {
try {
rabbitTemplate.convertAndSend(routingKey, objectMapper.writeValueAsString(queue));
} catch (JsonProcessingException e) {
log.warn("Can't send event!", e);
}
}
您需要关闭连接才能使消息重新排队。
最好在 OOME 后终止应用程序(这当然会关闭连接)。
我有一个我放入 Spring AMQP 中的对象列表。对象来自控制器。有一个服务可以处理这些对象。此服务可能因 OutOfMemoryException 而崩溃。因此,我运行应用程序的几个实例。
有一个问题:当服务崩溃时,我丢失了收到的消息。我读到了 NACK。并且可以在 Exception 或 RuntimeException 的情况下使用它。但是我的服务因错误而崩溃。因此,我不能发送 NACK。可否在AMQP中设置一个超时时间,超时后如果我没有确认之前到达的消息会再次发送消息?
这是我写的代码:
public class Exchanges {
public static final String EXC_RENDER_NAME = "render.exchange.topic";
public static final TopicExchange EXC_RENDER = new TopicExchange(EXC_RENDER_NAME, true, false);
}
public class Queues {
public static final String RENDER_NAME = "render.queue.topic";
public static final Queue RENDER = new Queue(RENDER_NAME);
}
@RequiredArgsConstructor
@Service
public class RenderRabbitEventListener extends RabbitEventListener {
private final ApplicationEventPublisher eventPublisher;
@RabbitListener(bindings = @QueueBinding(value = @Queue(Queues.RENDER_NAME),
exchange = @Exchange(value = Exchanges.EXC_RENDER_NAME, type = "topic"),
key = "render.#")
)
public void onMessage(Message message, Channel channel) {
String routingKey = parseRoutingKey(message);
log.debug(String.format("Event %s", routingKey));
RenderQueueObject queueObject = parseRender(message, RenderQueueObject.class);
handleMessage(queueObject);
}
public void handleMessage(RenderQueueObject render) {
GenericSpringEvent<RenderQueueObject> springEvent = new GenericSpringEvent<>(render);
springEvent.setRender(true);
eventPublisher.publishEvent(springEvent);
}
}
这是发送消息的方法:
@Async ("threadPoolTaskExecutor")
@EventListener (condition = "# event.queue")
public void start (GenericSpringEvent <RenderQueueObject> event) {
RenderQueueObject renderQueueObject = event.getWhat ();
send (RENDER_NAME, renderQueueObject);
}
private void send(String routingKey, Object queue) {
try {
rabbitTemplate.convertAndSend(routingKey, objectMapper.writeValueAsString(queue));
} catch (JsonProcessingException e) {
log.warn("Can't send event!", e);
}
}
您需要关闭连接才能使消息重新排队。
最好在 OOME 后终止应用程序(这当然会关闭连接)。