Spring 云流:StreamBridge 和 Transaction
Spring cloud stream : StreamBridge and Transaction
我想要一个 spring 云流侦听器处理关于其中发送的所有消息的完整事务。或者,函数中使用StreamBridge手动发送的所有消息,即使之后有异常也都提交。
这是我的库版本:
spring : 2.5.5
spring cloud stream : 3.1.4
spring cloud stream rabbit binder : 3.1.4
我的 spring 云流配置文件:
spring:
cloud:
function:
definition: test
stream:
rabbit:
default:
producer:
transacted: true
consumer:
transacted: true
bindings:
test-in-0:
consumer:
queueNameGroupOnly: true
receive-timeout: 500
transacted: true
test-out-0:
producer:
queueNameGroupOnly: true
transacted: true
other-out-0:
producer:
queueNameGroupOnly: true
transacted: true
bindings:
test-in-0:
destination: test.request
group: test.request
consumer:
requiredGroups: test.request
maxAttempts: 1
test-out-0:
destination: test.response
group: test.response
producer:
requiredGroups: test.response
other-out-0:
destination: test.other.request
group: test.other.request
producer:
requiredGroups: test.other.request
我的测试java代码:
@Configuration
public class TestSender {
@Bean
public Function<Message<TestRequest>, Message<String>> test(Service service) {
return (request) -> service.run(request.getPayload().getContent());
}
}
@Component
@Transactional
public class Service {
private static final Logger LOGGER = LoggerFactory.getLogger(Service.class);
StreamBridge bridge;
IWorker worker;
public Service(StreamBridge bridge, IWorker worker) {
this.bridge = bridge;
this.worker = worker;
}
@Transactional
public Message<String> run(String message) {
LOGGER.info("Processing {}", message);
bridge.send("other-out-0", MessageBuilder.withPayload("test")
.setHeader("toto", "titi").build());
if (message.equals("error")) {
throw new RuntimeException("test error");
}
return MessageBuilder.withPayload("test")
.setHeader("toto", "titi").build();
}
}
测试 class 到 运行 :
@SpringBootApplication
public class EmptyWorkerApplication {
private static final Logger LOGGER = LoggerFactory.getLogger(EmptyWorkerApplication.class);
public static void main(String[] args) {
SpringApplication.run(EmptyWorkerApplication.class, args);
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
LOGGER.info("Sending messages ...");
template.convertAndSend("test.request", "#",
org.springframework.amqp.core.MessageBuilder.withBody(
"{\"content\":\"toto\"}".getBytes(StandardCharsets.UTF_8))
.setContentType("application/json")
.build());
template.convertAndSend("test.request", "#",
org.springframework.amqp.core.MessageBuilder.withBody(
"{\"content\":\"error\"}".getBytes(StandardCharsets.UTF_8))
.setContentType("application/json")
.build());
template.convertAndSend("test.request", "#",
org.springframework.amqp.core.MessageBuilder.withBody(
"{\"content\":\"titi\"}".getBytes(StandardCharsets.UTF_8))
.setContentType("application/json")
.build());
};
}
我还添加了一个 TransactionManager :
@Configuration
@EnableTransactionManagement
public class TransactionManagerConfiguration {
@Bean(name = "transactionManager")
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory cf) {
RabbitTransactionManager manager = new RabbitTransactionManager(cf);
return manager;
}
}
在最后这个例子中,我的兔子队列中有:
或者我预计 test.other.request 上只有 2 条消息。我做错了什么?
编辑 1
尝试过的代码:
@Component("myfunction")
public class Myfunction implements Consumer<String> {
private final StreamBridge streamBridge;
public Myfunction(StreamBridge streamBridge) {
this.streamBridge = streamBridge;
}
@Override
@Transactional
public void accept(String request) {
this.streamBridge.send("myfunction-out-0", request);
if (request.equals("error")) {
throw new RuntimeException("test error");
}
}
}
@SpringBootApplication
public class EmptyWorkerApplication {
public static void main(String[] args) {
SpringApplication.run(EmptyWorkerApplication.class, args);
}
@Bean
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory cf) {
RabbitTransactionManager manager = new RabbitTransactionManager(cf);
return manager;
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
template.convertAndSend("test.request", "#",
org.springframework.amqp.core.MessageBuilder.withBody(
"test".getBytes(StandardCharsets.UTF_8))
.setContentType("text/plain")
.build());
template.convertAndSend("test.request", "#",
org.springframework.amqp.core.MessageBuilder.withBody(
"error".getBytes(StandardCharsets.UTF_8))
.setContentType("text/plain")
.build());
template.convertAndSend("test.request", "#",
org.springframework.amqp.core.MessageBuilder.withBody(
"test".getBytes(StandardCharsets.UTF_8))
.setContentType("text/plain")
.build());
};
}
}
spring:
rabbitmq:
host: xx
port: xx
username: xx
password: xx
virtual-host: xx
cloud:
function:
definition: myfunction
stream:
rabbit:
bindings:
myfunction-in-0:
queueNameGroupOnly: true
myfunction-out-0:
queueNameGroupOnly: true
transacted: true
bindings:
myfunction-in-0:
destination: test.request
group: test.request
consumer:
requiredGroups: test.request
autoBindDlq: true
maxAttempts: 1
myfunction-out-0:
destination: test.response
group: test.response
producer:
requiredGroups: test.response
编辑 2:
我终于成功了。
我的错误是设置 属性
spring.cloud.stream.rabbit.bindings.myfunction-in-0.consumer.transacted=true
代替
spring.cloud.stream.rabbit.bindings.myfunction-in.consumer.transacted=true
我其实不明白其中的区别,我在 spring 云流和 spring 云兔活页夹文档中都没有找到任何解释。
请看这个回答 -
还有一个 link 我刚刚在评论中为用户做的一个工作示例
我想要一个 spring 云流侦听器处理关于其中发送的所有消息的完整事务。或者,函数中使用StreamBridge手动发送的所有消息,即使之后有异常也都提交。
这是我的库版本:
spring : 2.5.5
spring cloud stream : 3.1.4
spring cloud stream rabbit binder : 3.1.4
我的 spring 云流配置文件:
spring:
cloud:
function:
definition: test
stream:
rabbit:
default:
producer:
transacted: true
consumer:
transacted: true
bindings:
test-in-0:
consumer:
queueNameGroupOnly: true
receive-timeout: 500
transacted: true
test-out-0:
producer:
queueNameGroupOnly: true
transacted: true
other-out-0:
producer:
queueNameGroupOnly: true
transacted: true
bindings:
test-in-0:
destination: test.request
group: test.request
consumer:
requiredGroups: test.request
maxAttempts: 1
test-out-0:
destination: test.response
group: test.response
producer:
requiredGroups: test.response
other-out-0:
destination: test.other.request
group: test.other.request
producer:
requiredGroups: test.other.request
我的测试java代码:
@Configuration
public class TestSender {
@Bean
public Function<Message<TestRequest>, Message<String>> test(Service service) {
return (request) -> service.run(request.getPayload().getContent());
}
}
@Component
@Transactional
public class Service {
private static final Logger LOGGER = LoggerFactory.getLogger(Service.class);
StreamBridge bridge;
IWorker worker;
public Service(StreamBridge bridge, IWorker worker) {
this.bridge = bridge;
this.worker = worker;
}
@Transactional
public Message<String> run(String message) {
LOGGER.info("Processing {}", message);
bridge.send("other-out-0", MessageBuilder.withPayload("test")
.setHeader("toto", "titi").build());
if (message.equals("error")) {
throw new RuntimeException("test error");
}
return MessageBuilder.withPayload("test")
.setHeader("toto", "titi").build();
}
}
测试 class 到 运行 :
@SpringBootApplication
public class EmptyWorkerApplication {
private static final Logger LOGGER = LoggerFactory.getLogger(EmptyWorkerApplication.class);
public static void main(String[] args) {
SpringApplication.run(EmptyWorkerApplication.class, args);
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
LOGGER.info("Sending messages ...");
template.convertAndSend("test.request", "#",
org.springframework.amqp.core.MessageBuilder.withBody(
"{\"content\":\"toto\"}".getBytes(StandardCharsets.UTF_8))
.setContentType("application/json")
.build());
template.convertAndSend("test.request", "#",
org.springframework.amqp.core.MessageBuilder.withBody(
"{\"content\":\"error\"}".getBytes(StandardCharsets.UTF_8))
.setContentType("application/json")
.build());
template.convertAndSend("test.request", "#",
org.springframework.amqp.core.MessageBuilder.withBody(
"{\"content\":\"titi\"}".getBytes(StandardCharsets.UTF_8))
.setContentType("application/json")
.build());
};
}
我还添加了一个 TransactionManager :
@Configuration
@EnableTransactionManagement
public class TransactionManagerConfiguration {
@Bean(name = "transactionManager")
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory cf) {
RabbitTransactionManager manager = new RabbitTransactionManager(cf);
return manager;
}
}
在最后这个例子中,我的兔子队列中有:
或者我预计 test.other.request 上只有 2 条消息。我做错了什么?
编辑 1
尝试过的代码:
@Component("myfunction")
public class Myfunction implements Consumer<String> {
private final StreamBridge streamBridge;
public Myfunction(StreamBridge streamBridge) {
this.streamBridge = streamBridge;
}
@Override
@Transactional
public void accept(String request) {
this.streamBridge.send("myfunction-out-0", request);
if (request.equals("error")) {
throw new RuntimeException("test error");
}
}
}
@SpringBootApplication
public class EmptyWorkerApplication {
public static void main(String[] args) {
SpringApplication.run(EmptyWorkerApplication.class, args);
}
@Bean
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory cf) {
RabbitTransactionManager manager = new RabbitTransactionManager(cf);
return manager;
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
template.convertAndSend("test.request", "#",
org.springframework.amqp.core.MessageBuilder.withBody(
"test".getBytes(StandardCharsets.UTF_8))
.setContentType("text/plain")
.build());
template.convertAndSend("test.request", "#",
org.springframework.amqp.core.MessageBuilder.withBody(
"error".getBytes(StandardCharsets.UTF_8))
.setContentType("text/plain")
.build());
template.convertAndSend("test.request", "#",
org.springframework.amqp.core.MessageBuilder.withBody(
"test".getBytes(StandardCharsets.UTF_8))
.setContentType("text/plain")
.build());
};
}
}
spring:
rabbitmq:
host: xx
port: xx
username: xx
password: xx
virtual-host: xx
cloud:
function:
definition: myfunction
stream:
rabbit:
bindings:
myfunction-in-0:
queueNameGroupOnly: true
myfunction-out-0:
queueNameGroupOnly: true
transacted: true
bindings:
myfunction-in-0:
destination: test.request
group: test.request
consumer:
requiredGroups: test.request
autoBindDlq: true
maxAttempts: 1
myfunction-out-0:
destination: test.response
group: test.response
producer:
requiredGroups: test.response
编辑 2:
我终于成功了。
我的错误是设置 属性
spring.cloud.stream.rabbit.bindings.myfunction-in-0.consumer.transacted=true
代替
spring.cloud.stream.rabbit.bindings.myfunction-in.consumer.transacted=true
我其实不明白其中的区别,我在 spring 云流和 spring 云兔活页夹文档中都没有找到任何解释。
请看这个回答 -