Spring Cloud Stream - 功能 - 如何手动确认rabbitmq消息?
Spring Cloud Stream - Functions - How to manually acknowledge rabbitmq message?
我正在使用带有 rabbitbinder 的 spring 云流。
使用@StreamListener,我可以通过将 Channel 和 deliveryTag 注入到方法中来手动确认 rabbitmq 消息,如下所示:
@StreamListener(target = MySink.INPUT1)
public void listenForInput1(Message<String> message,
@Header(AmqpHeaders.CHANNEL) Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) throws IOException {
log.info(" received new message [" + message.toString() + "] ");
channel.basicAck(deliveryTag, false);
}
我现在正在尝试使用函数实现相同的目的:
@Bean
public Consumer<Message<String>> sink1() {
return message -> {
System.out.println("******************");
System.out.println("At Sink1");
System.out.println("******************");
System.out.println("Received message " + message.getPayload());
};
}
如何在此处获取频道 object 以便我可以使用 deliveryTag 确认它?
我能够获得送货标签表 headers。但是,我无法获取频道 Object.
我弄明白了:
@Bean
public Consumer<Message<String>> sink1() {
return message -> {
System.out.println("******************");
System.out.println("At Sink1");
System.out.println("******************");
System.out.println("Received message " + message.getPayload());
Channel channel = message.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class);
Long deliveryTag = message.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class);
try {
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
e.printStackTrace();
}
};
}
我正在使用带有 rabbitbinder 的 spring 云流。
使用@StreamListener,我可以通过将 Channel 和 deliveryTag 注入到方法中来手动确认 rabbitmq 消息,如下所示:
@StreamListener(target = MySink.INPUT1)
public void listenForInput1(Message<String> message,
@Header(AmqpHeaders.CHANNEL) Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) throws IOException {
log.info(" received new message [" + message.toString() + "] ");
channel.basicAck(deliveryTag, false);
}
我现在正在尝试使用函数实现相同的目的:
@Bean
public Consumer<Message<String>> sink1() {
return message -> {
System.out.println("******************");
System.out.println("At Sink1");
System.out.println("******************");
System.out.println("Received message " + message.getPayload());
};
}
如何在此处获取频道 object 以便我可以使用 deliveryTag 确认它? 我能够获得送货标签表 headers。但是,我无法获取频道 Object.
我弄明白了:
@Bean
public Consumer<Message<String>> sink1() {
return message -> {
System.out.println("******************");
System.out.println("At Sink1");
System.out.println("******************");
System.out.println("Received message " + message.getPayload());
Channel channel = message.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class);
Long deliveryTag = message.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class);
try {
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
e.printStackTrace();
}
};
}