Spring 集成(手动确认)

Spring integration (Manual Acking)

我想通过 Spring 集成创建一个简单的 IntegrationFlow,但我遇到了困难。

我想创建一个集成流,从 Rabbit Mq 中的队列获取消息,然后 post 将消息发送到端点 Rest。我想根据 post 的结果手动 ack

集成流的典型行为如下:

  1. 我在队列中收到一条消息。
  2. Spring 检测到它,获取消息并 post 将其发送到 Rest 端点。
  3. 端点以 200 代码响应。
  4. Spring 集成确认消息。

如果端点以错误代码响应,我希望能够 nack 或重试。

HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
RestTemplate restTemplate = new RestTemplate();
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setQueueNames(BOUTIQUE_QUEUE_NAME);

/* Get Message from RabbitMQ */
return IntegrationFlows.from(Amqp.inboundAdapter(container)) 
    .handle(msg ->
    {
        String msgString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
        HttpEntity<String> requestBody = new HttpEntity<String>(msgString, headers);
        restTemplate.postForObject(ENDPOINT_LOCAL_URL, requestBody, String.class);
        System.out.println(msgString);
    })
    .get();

您不需要为此用例使用手动确认模式;如果他 rest call returns 正常,容器会确认消息;如果抛出异常,容器将取消消息并重新传递。

如果您使用手动确认,ChanneldeliveryTagAmqpHeaders.CHANNELAmqpHeaders.DELIVERY_TAG 消息 headers 中可用,您可以调用 [=通道上的 14=] 或 basicReject(您必须向入站适配器添加错误通道以处理错误。