使用函数式编程和流桥的轮询消费者
Polled Consumer With Functional Programming & Stream bridge
我正在使用 spring 云流和 kafka 代理进行微服务相互通信。作为其中的一部分,stream bridge
将用于发送消息,这很好。
但是在消费该消息的过程中,消息不需要立即被消费,而是当满足条件时,才应该被消费。
根据文档,我了解到我需要为此使用 Polled Consumers
(如果我弄错了请纠正我)。
根据我对 documentation 的理解,这是我尝试过的。
application.properties
spring.cloud.stream.pollable-source = consumeResponse
spring.cloud.stream.function.definition = consumeResponse
#stream bridge
spring.cloud.stream.bindings.outputchannel1.destination = REQUEST_TOPIC
spring.cloud.stream.bindings.outputchannel1.binder= kafka1
#polled Consumer
spring.cloud.stream.bindings.consumeResponse-in-0.binder= kafka1
spring.cloud.stream.bindings.consumeResponse-in-0.destination = REQUEST_TOPIC
spring.cloud.stream.bindings.consumeResponse-in-0.group = consumer_cloud_stream1
spring.cloud.stream.binders.kafka1.type=kafka
MainApplication.java
@Bean
public CommandLineRunner commandLineRunner(ApplicationContext ctx) {
return args -> {
//produce message
for (int i = 0; i < 5; i++) {
streamBridge.send("outputchannel1", "msg"+i);
System.out.println("Request :: " + "msg"+i);
}
};
}
@Bean
public Consumer<String> consumeResponse() {
return (response) -> {
//consume message
System.out.println("Response :: " + response);
};
}
@Bean
public ApplicationRunner poller(PollableMessageSource destIn, MessageChannel destOut) {
return args -> {
while (someCondition) { //some condition that checks whether or not to consume the message
try {
//condition satisfied, so forward message to consumer
if (!destIn.poll(m -> {
String newPayload = ((String) m.getPayload());
destOut.send(new GenericMessage<>(newPayload));
})) {
Thread.sleep(1000);
}
} catch (Exception e) {
e.printStackTrace();
}
}
};
}
但这会引发以下异常:-
Parameter 1 of method poller in com.MainApplication required a single bean, but 2 were found:
- nullChannel: defined in null
- errorChannel: defined in null
如果有人能在这里帮助我或指出我的工作示例,我将不胜感激。
Spring开机版本:2.6.4,
Spring云版本:2021.0.1
你为什么不直接将 StreamBridge
注入你的运行器而不是消息通道?
默认情况下,创建流桥输出通道on-demand(第一次发送)。
我正在使用 spring 云流和 kafka 代理进行微服务相互通信。作为其中的一部分,stream bridge
将用于发送消息,这很好。
但是在消费该消息的过程中,消息不需要立即被消费,而是当满足条件时,才应该被消费。
根据文档,我了解到我需要为此使用 Polled Consumers
(如果我弄错了请纠正我)。
根据我对 documentation 的理解,这是我尝试过的。
application.properties
spring.cloud.stream.pollable-source = consumeResponse
spring.cloud.stream.function.definition = consumeResponse
#stream bridge
spring.cloud.stream.bindings.outputchannel1.destination = REQUEST_TOPIC
spring.cloud.stream.bindings.outputchannel1.binder= kafka1
#polled Consumer
spring.cloud.stream.bindings.consumeResponse-in-0.binder= kafka1
spring.cloud.stream.bindings.consumeResponse-in-0.destination = REQUEST_TOPIC
spring.cloud.stream.bindings.consumeResponse-in-0.group = consumer_cloud_stream1
spring.cloud.stream.binders.kafka1.type=kafka
MainApplication.java
@Bean
public CommandLineRunner commandLineRunner(ApplicationContext ctx) {
return args -> {
//produce message
for (int i = 0; i < 5; i++) {
streamBridge.send("outputchannel1", "msg"+i);
System.out.println("Request :: " + "msg"+i);
}
};
}
@Bean
public Consumer<String> consumeResponse() {
return (response) -> {
//consume message
System.out.println("Response :: " + response);
};
}
@Bean
public ApplicationRunner poller(PollableMessageSource destIn, MessageChannel destOut) {
return args -> {
while (someCondition) { //some condition that checks whether or not to consume the message
try {
//condition satisfied, so forward message to consumer
if (!destIn.poll(m -> {
String newPayload = ((String) m.getPayload());
destOut.send(new GenericMessage<>(newPayload));
})) {
Thread.sleep(1000);
}
} catch (Exception e) {
e.printStackTrace();
}
}
};
}
但这会引发以下异常:-
Parameter 1 of method poller in com.MainApplication required a single bean, but 2 were found:
- nullChannel: defined in null
- errorChannel: defined in null
如果有人能在这里帮助我或指出我的工作示例,我将不胜感激。
Spring开机版本:2.6.4, Spring云版本:2021.0.1
你为什么不直接将 StreamBridge
注入你的运行器而不是消息通道?
默认情况下,创建流桥输出通道on-demand(第一次发送)。