Spring 集成(手动确认)
Spring integration (Manual Acking)
我想通过 Spring 集成创建一个简单的 IntegrationFlow,但我遇到了困难。
我想创建一个集成流,从 Rabbit Mq 中的队列获取消息,然后 post 将消息发送到端点 Rest。我想根据 post 的结果手动 ack
。
集成流的典型行为如下:
- 我在队列中收到一条消息。
- Spring 检测到它,获取消息并 post 将其发送到 Rest 端点。
- 端点以 200 代码响应。
- Spring 集成确认消息。
如果端点以错误代码响应,我希望能够 nack 或重试。
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
RestTemplate restTemplate = new RestTemplate();
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setQueueNames(BOUTIQUE_QUEUE_NAME);
/* Get Message from RabbitMQ */
return IntegrationFlows.from(Amqp.inboundAdapter(container))
.handle(msg ->
{
String msgString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
HttpEntity<String> requestBody = new HttpEntity<String>(msgString, headers);
restTemplate.postForObject(ENDPOINT_LOCAL_URL, requestBody, String.class);
System.out.println(msgString);
})
.get();
您不需要为此用例使用手动确认模式;如果他 rest call returns 正常,容器会确认消息;如果抛出异常,容器将取消消息并重新传递。
如果您使用手动确认,Channel
和 deliveryTag
在 AmqpHeaders.CHANNEL
和 AmqpHeaders.DELIVERY_TAG
消息 headers 中可用,您可以调用 [=通道上的 14=] 或 basicReject
(您必须向入站适配器添加错误通道以处理错误。
我想通过 Spring 集成创建一个简单的 IntegrationFlow,但我遇到了困难。
我想创建一个集成流,从 Rabbit Mq 中的队列获取消息,然后 post 将消息发送到端点 Rest。我想根据 post 的结果手动 ack
。
集成流的典型行为如下:
- 我在队列中收到一条消息。
- Spring 检测到它,获取消息并 post 将其发送到 Rest 端点。
- 端点以 200 代码响应。
- Spring 集成确认消息。
如果端点以错误代码响应,我希望能够 nack 或重试。
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
RestTemplate restTemplate = new RestTemplate();
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setQueueNames(BOUTIQUE_QUEUE_NAME);
/* Get Message from RabbitMQ */
return IntegrationFlows.from(Amqp.inboundAdapter(container))
.handle(msg ->
{
String msgString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
HttpEntity<String> requestBody = new HttpEntity<String>(msgString, headers);
restTemplate.postForObject(ENDPOINT_LOCAL_URL, requestBody, String.class);
System.out.println(msgString);
})
.get();
您不需要为此用例使用手动确认模式;如果他 rest call returns 正常,容器会确认消息;如果抛出异常,容器将取消消息并重新传递。
如果您使用手动确认,Channel
和 deliveryTag
在 AmqpHeaders.CHANNEL
和 AmqpHeaders.DELIVERY_TAG
消息 headers 中可用,您可以调用 [=通道上的 14=] 或 basicReject
(您必须向入站适配器添加错误通道以处理错误。