保证异步传递消息
Guaranteed message delivery asynchronously
我写了一个代码来保证消息的传递和处理。但它在一个线程中工作。
如何重构代码以使其在 并行线程或异步 中工作?在这种情况下,即使应用程序崩溃,也必须保证消息能够被传递。它们将在应用程序重新启动后或在该应用程序的其他 运行 个实例的帮助下交付。
制作人:
@Async("threadPoolTaskExecutor")
@EventListener(condition = "#event.queue")
public void start(GenericSpringEvent<RenderQueueObject> event) {
RenderQueueObject renderQueueObject = event.getWhat();
send(RENDER_NAME, renderQueueObject);
}
private void send(String routingKey, Object queue) {
try {
log.info("SEND message");
rabbitTemplate.convertAndSend(routingKey, objectMapper.writeValueAsString(queue));
} catch (JsonProcessingException e) {
log.warn("Can't send event!", e);
}
}
消费者
@Slf4j
@RequiredArgsConstructor
@Service
public class RenderRabbitEventListener extends RabbitEventListener {
private final ApplicationEventPublisher eventPublisher;
@RabbitListener(bindings = @QueueBinding(value = @Queue(Queues.RENDER_NAME),
exchange = @Exchange(value = Exchanges.EXC_RENDER_NAME, type = "topic"),
key = "render.#")
)
public void onMessage(Message message, Channel channel) {
String routingKey = parseRoutingKey(message);
log.debug(String.format("Event %s", routingKey));
RenderQueueObject queueObject = parseRender(message, RenderQueueObject.class);
handleMessage(queueObject);
}
public void handleMessage(RenderQueueObject render) {
GenericSpringEvent<RenderQueueObject> springEvent = new GenericSpringEvent<>(render);
springEvent.setRender(true);
eventPublisher.publishEvent(springEvent);
}
}
public class Exchanges {
public static final String EXC_RENDER_NAME = "render.exchange.topic";
public static final TopicExchange EXC_RENDER = new TopicExchange(EXC_RENDER_NAME, true, false);
}
public class Queues {
public static final String RENDER_NAME = "render.queue.topic";
public static final Queue RENDER = new Queue(RENDER_NAME);
}
所以我的消息被处理了。如果我加上@Async,那么就会并行处理,但是如果应用程序崩溃了,那么在重新开始的时候,就不会再发送消息了。
@EventListener(condition = "#event.render")
public void startRender(GenericSpringEvent<RenderQueueObject> event) {
RenderQueueObject render = event.getWhat();
storageService.updateDocument(
render.getGuid(),
new Document("$set", new Document("dateStartRendering", new Date()).append("status", State.rendering.toString()))
);
Future<RenderWorkObject> submit = taskExecutor.submit(new RenderExecutor(render));
try {
completeResult(submit);
} catch (IOException | ExecutionException | InterruptedException e) {
log.info("Error when complete results after invoke executors");
}
}
private void completeResult(Future<RenderWorkObject> renderFuture) throws IOException, ExecutionException, InterruptedException {
RenderWorkObject renderWorkObject = renderFuture.get();
State currentState = renderWorkObject.getState();
if (Stream.of(result, error, cancel).anyMatch(isEqual(currentState))) {
storageService.updateDocument(renderWorkObject.getGuidJob(), new Document("$set", toUpdate));
}
}
我尝试自定义配置以满足我的需要。但是没有用:
@Bean
Queue queue() {
return Queues.RENDER;
}
@Bean
TopicExchange exchange() {
return Exchanges.EXC_RENDER;
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(Queues.RENDER_NAME);
}
@Bean
public RabbitTemplate rabbitTemplate(@Qualifier("defaultConnectionFactory") ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
return template;
}
@Bean
public SimpleMessageListenerContainer container(@Qualifier("defaultConnectionFactory") ConnectionFactory connectionFactory, RabbitEventListener listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames(Queues.RENDER_NAME);
container.setQueues(Queues.RENDER);
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(20);
container.setConcurrentConsumers(10);
container.setPrefetchCount(1000);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return container;
}
@Bean
public ConnectionFactory defaultConnectionFactory() {
CachingConnectionFactory cf = new CachingConnectionFactory();
cf.setAddresses("127.0.0.1:5672");
cf.setUsername("guest");
cf.setPassword("guest");
cf.setVirtualHost("/");
cf.setPublisherConfirms(true);
cf.setPublisherReturns(true);
cf.setChannelCacheSize(25);
ExecutorService es = Executors.newFixedThreadPool(20);
cf.setExecutor(es);
return cf;
}
如有任何想法,我将不胜感激
我想我找到了解决办法。我更改了 RenderRabbitEventListener,以便在发生崩溃时如果从 Rabbit 收到消息,它会再次将消息发送到队列。因此,我的消费者将始终并行工作。这将在所有节点发生故障的情况下以及在一个节点发生故障的情况下并行工作。
以下是我所做的更改:
@RabbitListener(bindings = @QueueBinding(value = @Queue(Queues.RENDER_NAME),
exchange = @Exchange(value = Exchanges.EXC_RENDER_NAME, type = "topic"),
key = "render.#")
)
public void onMessage(Message message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag
) {
RenderQueueObject queueObject = parseRender(message, RenderQueueObject.class);
if (message.getMessageProperties().isRedelivered()) {
log.info("Message Redelivered, try also");
try {
channel.basicAck(tag, false);
MessageConverter messageConverter = rabbitTemplate.getMessageConverter();
String valueAsString = parseBody(message);
Message copyMessage = messageConverter.toMessage(valueAsString, new MessageProperties());
rabbitTemplate.convertAndSend(
message.getMessageProperties().getReceivedRoutingKey(),
copyMessage);
return;
} catch (IOException e) {
log.info("basicAck exception");
}
}
log.info("message not redelievered");
String routingKey = parseRoutingKey(message);
log.debug(String.format("Event %s", routingKey));
handleMessage(queueObject);
}
我写了一个代码来保证消息的传递和处理。但它在一个线程中工作。 如何重构代码以使其在 并行线程或异步 中工作?在这种情况下,即使应用程序崩溃,也必须保证消息能够被传递。它们将在应用程序重新启动后或在该应用程序的其他 运行 个实例的帮助下交付。
制作人:
@Async("threadPoolTaskExecutor")
@EventListener(condition = "#event.queue")
public void start(GenericSpringEvent<RenderQueueObject> event) {
RenderQueueObject renderQueueObject = event.getWhat();
send(RENDER_NAME, renderQueueObject);
}
private void send(String routingKey, Object queue) {
try {
log.info("SEND message");
rabbitTemplate.convertAndSend(routingKey, objectMapper.writeValueAsString(queue));
} catch (JsonProcessingException e) {
log.warn("Can't send event!", e);
}
}
消费者
@Slf4j
@RequiredArgsConstructor
@Service
public class RenderRabbitEventListener extends RabbitEventListener {
private final ApplicationEventPublisher eventPublisher;
@RabbitListener(bindings = @QueueBinding(value = @Queue(Queues.RENDER_NAME),
exchange = @Exchange(value = Exchanges.EXC_RENDER_NAME, type = "topic"),
key = "render.#")
)
public void onMessage(Message message, Channel channel) {
String routingKey = parseRoutingKey(message);
log.debug(String.format("Event %s", routingKey));
RenderQueueObject queueObject = parseRender(message, RenderQueueObject.class);
handleMessage(queueObject);
}
public void handleMessage(RenderQueueObject render) {
GenericSpringEvent<RenderQueueObject> springEvent = new GenericSpringEvent<>(render);
springEvent.setRender(true);
eventPublisher.publishEvent(springEvent);
}
}
public class Exchanges {
public static final String EXC_RENDER_NAME = "render.exchange.topic";
public static final TopicExchange EXC_RENDER = new TopicExchange(EXC_RENDER_NAME, true, false);
}
public class Queues {
public static final String RENDER_NAME = "render.queue.topic";
public static final Queue RENDER = new Queue(RENDER_NAME);
}
所以我的消息被处理了。如果我加上@Async,那么就会并行处理,但是如果应用程序崩溃了,那么在重新开始的时候,就不会再发送消息了。
@EventListener(condition = "#event.render")
public void startRender(GenericSpringEvent<RenderQueueObject> event) {
RenderQueueObject render = event.getWhat();
storageService.updateDocument(
render.getGuid(),
new Document("$set", new Document("dateStartRendering", new Date()).append("status", State.rendering.toString()))
);
Future<RenderWorkObject> submit = taskExecutor.submit(new RenderExecutor(render));
try {
completeResult(submit);
} catch (IOException | ExecutionException | InterruptedException e) {
log.info("Error when complete results after invoke executors");
}
}
private void completeResult(Future<RenderWorkObject> renderFuture) throws IOException, ExecutionException, InterruptedException {
RenderWorkObject renderWorkObject = renderFuture.get();
State currentState = renderWorkObject.getState();
if (Stream.of(result, error, cancel).anyMatch(isEqual(currentState))) {
storageService.updateDocument(renderWorkObject.getGuidJob(), new Document("$set", toUpdate));
}
}
我尝试自定义配置以满足我的需要。但是没有用:
@Bean
Queue queue() {
return Queues.RENDER;
}
@Bean
TopicExchange exchange() {
return Exchanges.EXC_RENDER;
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(Queues.RENDER_NAME);
}
@Bean
public RabbitTemplate rabbitTemplate(@Qualifier("defaultConnectionFactory") ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
return template;
}
@Bean
public SimpleMessageListenerContainer container(@Qualifier("defaultConnectionFactory") ConnectionFactory connectionFactory, RabbitEventListener listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames(Queues.RENDER_NAME);
container.setQueues(Queues.RENDER);
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(20);
container.setConcurrentConsumers(10);
container.setPrefetchCount(1000);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return container;
}
@Bean
public ConnectionFactory defaultConnectionFactory() {
CachingConnectionFactory cf = new CachingConnectionFactory();
cf.setAddresses("127.0.0.1:5672");
cf.setUsername("guest");
cf.setPassword("guest");
cf.setVirtualHost("/");
cf.setPublisherConfirms(true);
cf.setPublisherReturns(true);
cf.setChannelCacheSize(25);
ExecutorService es = Executors.newFixedThreadPool(20);
cf.setExecutor(es);
return cf;
}
如有任何想法,我将不胜感激
我想我找到了解决办法。我更改了 RenderRabbitEventListener,以便在发生崩溃时如果从 Rabbit 收到消息,它会再次将消息发送到队列。因此,我的消费者将始终并行工作。这将在所有节点发生故障的情况下以及在一个节点发生故障的情况下并行工作。
以下是我所做的更改:
@RabbitListener(bindings = @QueueBinding(value = @Queue(Queues.RENDER_NAME),
exchange = @Exchange(value = Exchanges.EXC_RENDER_NAME, type = "topic"),
key = "render.#")
)
public void onMessage(Message message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag
) {
RenderQueueObject queueObject = parseRender(message, RenderQueueObject.class);
if (message.getMessageProperties().isRedelivered()) {
log.info("Message Redelivered, try also");
try {
channel.basicAck(tag, false);
MessageConverter messageConverter = rabbitTemplate.getMessageConverter();
String valueAsString = parseBody(message);
Message copyMessage = messageConverter.toMessage(valueAsString, new MessageProperties());
rabbitTemplate.convertAndSend(
message.getMessageProperties().getReceivedRoutingKey(),
copyMessage);
return;
} catch (IOException e) {
log.info("basicAck exception");
}
}
log.info("message not redelievered");
String routingKey = parseRoutingKey(message);
log.debug(String.format("Event %s", routingKey));
handleMessage(queueObject);
}