发布者通过 Spring Cloud Stream 确认
Publisher Confirms with Spring Cloud Stream
我有兴趣使用 Spring Cloud Stream 在项目中的一些制作人中使用 Publisher Confirms。我试过做一个小的 PoC,但没有用。据我在文档中看到的,这对于 Asyncrhonous Publisher Confirm 是可行的,并且应该像接下来的更改一样简单:
在 application.yml 添加 confirmAckChannel 并启用 errorChannelEnabled 属性.
spring.cloud.stream:
binders:
rabbitDefault:
defaultCandidate: false
type: rabbit
environment.spring.rabbitmq.host: ${spring.rabbitmq.addresses}
....
bindings:
testOutput:
destination: test
binder: rabbitDefault
content-type: application/json
rabbit.bindings:
testOutput.producer:
confirmAckChannel: "testAck"
errorChannelEnabled: true
然后是一个由端点触发的简单服务,我将与 errorChannel 相关的 header 插入到事件中。
@Service
@RequiredArgsConstructor
public class TestService {
private final TestPublisher testPublisher;
public void sendMessage() {
testPublisher.send(addHeaders());
}
private Message<Event<TestEvent>> addHeaders() {
return withPayload(new Event<>(TestEvent.builder().build()))
.setHeader(MessageHeaders.ERROR_CHANNEL, "errorChannelTest")
.build();
}
}
然后是RabbitMQ的Publisher
@Component
@RequiredArgsConstructor
public class TestPublisher {
private final MessagingChannels messagingChannels;
public boolean send(Message<Event<TestEvent>> message) {
return messagingChannels.test().send(message);
}
}
其中 MessagingChannels 实现为
public interface MessagingChannels {
@Input("testAck")
MessageChannel testAck();
@Input("errorChannelTest")
MessageChannel testError();
@Output("testOutput")
MessageChannel test();
}
之后,我实现了 2 个监听器,一个用于 errorChannelTest 输入,另一个用于 testAck。
@Slf4j
@Component
@RequiredArgsConstructor
class TestErrorListener {
@StreamListener("errorChannelTest")
void onCommandReceived(Event<Message> message) {
log.info("Message error received: " + message);
}
}
@Slf4j
@Component
@RequiredArgsConstructor
class TestAckListener {
@StreamListener("testAck")
void onCommandReceived(Event<Message> message) {
log.info("Message ACK received: " + message);
}
}
但是,我在这两个监听器中没有收到任何 RabbitMQ 的 ACK 或 NACK,事件已正确发送到 RabbitMQ 并由交换管理,但后来我没有收到 RabbitMQ 的任何响应。
我错过了什么吗?我也检查了这两个属性,但效果不佳
spring:
rabbitmq:
publisher-confirm-type: CORRELATED
publisher-returns: true
我正在使用 Spring-Cloud-Stream 3.0.1.RELEASE 和 spring-cloud-starter-stream-rabbit 3.0.1.RELEASE
-----已编辑-----
这是根据 Gary Russell 的建议更新的示例
Application.yml
spring.cloud.stream:
binders:
rabbitDefault:
defaultCandidate: false
type: rabbit
environment.spring.rabbitmq.host: ${spring.rabbitmq.addresses}
bindings:
testOutput:
destination: exchange.output.test
binder: rabbitDefault
content-type: application/json
testOutput.producer:
errorChannelEnabled: true
rabbit.bindings:
testOutput.producer:
confirmAckChannel: "testAck"
spring:
rabbitmq:
publisher-confirm-type: correlated
publisher-returns: true
测试服务
@Service
@RequiredArgsConstructor
public class TestService {
private final TestPublisher testPublisher;
public void sendMessage() {
testPublisher.send(addHeaders());
}
private Message<Event<TestEvent>> addHeaders(Test test) {
return withPayload(new Event<>(TestEvent.builder().test(test).build()))
.build();
}
}
TestService 由下一个简单控制器中的端点触发以检查此 PoC。
@RestController
@RequiredArgsConstructor
public class TestController {
private final TestService testService;
@PostMapping("/services/v1/test")
public ResponseEntity<Object> test(@RequestBody Test test) {
testService.sendMessage(test);
return ResponseEntity.ok().build();
}
}
然后是带有两个 ServiceActivators 的 RabbitMQ 发布者
@Component
@RequiredArgsConstructor
public class TestPublisher {
private final MessagingChannels messagingChannels;
public boolean send(Message<Event<TestEvent>> message) {
log.info("Message for Testing Publisher confirms sent: " + message);
return messagingChannels.test().send(message);
}
@ServiceActivator(inputChannel = TEST_ACK)
public void acks(Message<?> ack) {
log.info("Message ACK received for Test: " + ack);
}
@ServiceActivator(inputChannel = TEST_ERROR)
public void errors(Message<?> error) {
log.info("Message error for Test received: " + error);
}
}
其中 MessagingChannels 实现为
public interface MessagingChannels {
@Input("testAck")
MessageChannel testAck();
@Input("testOutput.errors")
MessageChannel testError();
@Output("testOutput")
MessageChannel test();
}
这是应用程序的主要部分(我也检查了@EnableIntegration)。
@EnableBinding(MessagingChannels.class)
@SpringBootApplication
@EnableScheduling
public class Main {
public static void main(String[] args) {
SpringApplication.run(Main.class, args);
}
}
testAck
不应该是绑定;它应该是 @ServiceActivator
而不是。
.setHeader(MessageHeaders.ERROR_CHANNEL, "errorChannelTest")
这在这种情况下行不通;错误被发送到名为 testOutput.errors
的频道;再次;这需要 @ServiceActivator
,而不是绑定。
你的errorChannelEnabled
放错地方了;这是一个普通的制作人 属性,不是特定于兔子的。
@SpringBootApplication
@EnableBinding(Source.class)
public class So62219823Application {
public static void main(String[] args) {
SpringApplication.run(So62219823Application.class, args);
}
@InboundChannelAdapter(channel = "output")
public String source() {
return "foo";
}
@ServiceActivator(inputChannel = "acks")
public void acks(Message<?> ack) {
System.out.println("Ack: " + ack);
}
@ServiceActivator(inputChannel = "output.errors")
public void errors(Message<?> error) {
System.out.println("Error: " + error);
}
}
spring:
cloud:
stream:
bindings:
output:
producer:
error-channel-enabled: true
rabbit:
bindings:
output:
producer:
confirm-ack-channel: acks
rabbitmq:
publisher-confirm-type: correlated
publisher-returns: true
我有兴趣使用 Spring Cloud Stream 在项目中的一些制作人中使用 Publisher Confirms。我试过做一个小的 PoC,但没有用。据我在文档中看到的,这对于 Asyncrhonous Publisher Confirm 是可行的,并且应该像接下来的更改一样简单:
在 application.yml 添加 confirmAckChannel 并启用 errorChannelEnabled 属性.
spring.cloud.stream:
binders:
rabbitDefault:
defaultCandidate: false
type: rabbit
environment.spring.rabbitmq.host: ${spring.rabbitmq.addresses}
....
bindings:
testOutput:
destination: test
binder: rabbitDefault
content-type: application/json
rabbit.bindings:
testOutput.producer:
confirmAckChannel: "testAck"
errorChannelEnabled: true
然后是一个由端点触发的简单服务,我将与 errorChannel 相关的 header 插入到事件中。
@Service
@RequiredArgsConstructor
public class TestService {
private final TestPublisher testPublisher;
public void sendMessage() {
testPublisher.send(addHeaders());
}
private Message<Event<TestEvent>> addHeaders() {
return withPayload(new Event<>(TestEvent.builder().build()))
.setHeader(MessageHeaders.ERROR_CHANNEL, "errorChannelTest")
.build();
}
}
然后是RabbitMQ的Publisher
@Component
@RequiredArgsConstructor
public class TestPublisher {
private final MessagingChannels messagingChannels;
public boolean send(Message<Event<TestEvent>> message) {
return messagingChannels.test().send(message);
}
}
其中 MessagingChannels 实现为
public interface MessagingChannels {
@Input("testAck")
MessageChannel testAck();
@Input("errorChannelTest")
MessageChannel testError();
@Output("testOutput")
MessageChannel test();
}
之后,我实现了 2 个监听器,一个用于 errorChannelTest 输入,另一个用于 testAck。
@Slf4j
@Component
@RequiredArgsConstructor
class TestErrorListener {
@StreamListener("errorChannelTest")
void onCommandReceived(Event<Message> message) {
log.info("Message error received: " + message);
}
}
@Slf4j
@Component
@RequiredArgsConstructor
class TestAckListener {
@StreamListener("testAck")
void onCommandReceived(Event<Message> message) {
log.info("Message ACK received: " + message);
}
}
但是,我在这两个监听器中没有收到任何 RabbitMQ 的 ACK 或 NACK,事件已正确发送到 RabbitMQ 并由交换管理,但后来我没有收到 RabbitMQ 的任何响应。
我错过了什么吗?我也检查了这两个属性,但效果不佳
spring:
rabbitmq:
publisher-confirm-type: CORRELATED
publisher-returns: true
我正在使用 Spring-Cloud-Stream 3.0.1.RELEASE 和 spring-cloud-starter-stream-rabbit 3.0.1.RELEASE
-----已编辑-----
这是根据 Gary Russell 的建议更新的示例
Application.yml
spring.cloud.stream:
binders:
rabbitDefault:
defaultCandidate: false
type: rabbit
environment.spring.rabbitmq.host: ${spring.rabbitmq.addresses}
bindings:
testOutput:
destination: exchange.output.test
binder: rabbitDefault
content-type: application/json
testOutput.producer:
errorChannelEnabled: true
rabbit.bindings:
testOutput.producer:
confirmAckChannel: "testAck"
spring:
rabbitmq:
publisher-confirm-type: correlated
publisher-returns: true
测试服务
@Service
@RequiredArgsConstructor
public class TestService {
private final TestPublisher testPublisher;
public void sendMessage() {
testPublisher.send(addHeaders());
}
private Message<Event<TestEvent>> addHeaders(Test test) {
return withPayload(new Event<>(TestEvent.builder().test(test).build()))
.build();
}
}
TestService 由下一个简单控制器中的端点触发以检查此 PoC。
@RestController
@RequiredArgsConstructor
public class TestController {
private final TestService testService;
@PostMapping("/services/v1/test")
public ResponseEntity<Object> test(@RequestBody Test test) {
testService.sendMessage(test);
return ResponseEntity.ok().build();
}
}
然后是带有两个 ServiceActivators 的 RabbitMQ 发布者
@Component
@RequiredArgsConstructor
public class TestPublisher {
private final MessagingChannels messagingChannels;
public boolean send(Message<Event<TestEvent>> message) {
log.info("Message for Testing Publisher confirms sent: " + message);
return messagingChannels.test().send(message);
}
@ServiceActivator(inputChannel = TEST_ACK)
public void acks(Message<?> ack) {
log.info("Message ACK received for Test: " + ack);
}
@ServiceActivator(inputChannel = TEST_ERROR)
public void errors(Message<?> error) {
log.info("Message error for Test received: " + error);
}
}
其中 MessagingChannels 实现为
public interface MessagingChannels {
@Input("testAck")
MessageChannel testAck();
@Input("testOutput.errors")
MessageChannel testError();
@Output("testOutput")
MessageChannel test();
}
这是应用程序的主要部分(我也检查了@EnableIntegration)。
@EnableBinding(MessagingChannels.class)
@SpringBootApplication
@EnableScheduling
public class Main {
public static void main(String[] args) {
SpringApplication.run(Main.class, args);
}
}
testAck
不应该是绑定;它应该是 @ServiceActivator
而不是。
.setHeader(MessageHeaders.ERROR_CHANNEL, "errorChannelTest")
这在这种情况下行不通;错误被发送到名为 testOutput.errors
的频道;再次;这需要 @ServiceActivator
,而不是绑定。
你的errorChannelEnabled
放错地方了;这是一个普通的制作人 属性,不是特定于兔子的。
@SpringBootApplication
@EnableBinding(Source.class)
public class So62219823Application {
public static void main(String[] args) {
SpringApplication.run(So62219823Application.class, args);
}
@InboundChannelAdapter(channel = "output")
public String source() {
return "foo";
}
@ServiceActivator(inputChannel = "acks")
public void acks(Message<?> ack) {
System.out.println("Ack: " + ack);
}
@ServiceActivator(inputChannel = "output.errors")
public void errors(Message<?> error) {
System.out.println("Error: " + error);
}
}
spring:
cloud:
stream:
bindings:
output:
producer:
error-channel-enabled: true
rabbit:
bindings:
output:
producer:
confirm-ack-channel: acks
rabbitmq:
publisher-confirm-type: correlated
publisher-returns: true