Spring Cloud Stream - 功能 - 如何手动确认rabbitmq消息?

Spring Cloud Stream - Functions - How to manually acknowledge rabbitmq message?

我正在使用带有 rabbitbinder 的 spring 云流。

使用@StreamListener,我可以通过将 Channel 和 deliveryTag 注入到方法中来手动确认 rabbitmq 消息,如下所示:

 @StreamListener(target = MySink.INPUT1)
 public void listenForInput1(Message<String> message,
      @Header(AmqpHeaders.CHANNEL) Channel channel,
      @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) throws IOException {

    log.info(" received new message [" + message.toString() + "] ");
    channel.basicAck(deliveryTag, false);
 }

我现在正在尝试使用函数实现相同的目的:

 @Bean
 public Consumer<Message<String>> sink1() {
    return message -> {
      System.out.println("******************");
      System.out.println("At Sink1");
      System.out.println("******************");
      System.out.println("Received message " + message.getPayload());
    };
  }

如何在此处获取频道 object 以便我可以使用 deliveryTag 确认它? 我能够获得送货标签表 headers。但是,我无法获取频道 Object.

我弄明白了:

  @Bean
  public Consumer<Message<String>> sink1() {
    return message -> {
      System.out.println("******************");
      System.out.println("At Sink1");
      System.out.println("******************");
      System.out.println("Received message " + message.getPayload());

      Channel channel = message.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class);
      Long deliveryTag = message.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class);

      try {
        channel.basicAck(deliveryTag, false);
      } catch (IOException e) {
        e.printStackTrace();
      }
    };
  }