使用函数式编程和流桥的轮询消费者

Polled Consumer With Functional Programming & Stream bridge

我正在使用 spring 云流和 kafka 代理进行微服务相互通信。作为其中的一部分,stream bridge 将用于发送消息,这很好。

但是在消费该消息的过程中,消息不需要立即被消费,而是当满足条件时,才应该被消费

根据文档,我了解到我需要为此使用 Polled Consumers(如果我弄错了请纠正我)。

根据我对 documentation 的理解,这是我尝试过的。

application.properties

spring.cloud.stream.pollable-source = consumeResponse
spring.cloud.stream.function.definition = consumeResponse

#stream bridge
spring.cloud.stream.bindings.outputchannel1.destination = REQUEST_TOPIC
spring.cloud.stream.bindings.outputchannel1.binder= kafka1

#polled Consumer
spring.cloud.stream.bindings.consumeResponse-in-0.binder= kafka1
spring.cloud.stream.bindings.consumeResponse-in-0.destination = REQUEST_TOPIC
spring.cloud.stream.bindings.consumeResponse-in-0.group = consumer_cloud_stream1

spring.cloud.stream.binders.kafka1.type=kafka

MainApplication.java

@Bean
public CommandLineRunner commandLineRunner(ApplicationContext ctx) {
    return args -> {
        //produce message
        for (int i = 0; i < 5; i++) {
            streamBridge.send("outputchannel1", "msg"+i);
            System.out.println("Request :: " + "msg"+i);
        }
    };
}


@Bean
public Consumer<String> consumeResponse() {
    return (response) -> {
        //consume message
        System.out.println("Response :: " + response);
    };
}


@Bean
public ApplicationRunner poller(PollableMessageSource destIn, MessageChannel destOut) {
    return args -> {
        while (someCondition) { //some condition that checks whether or not to consume the message
            try {
                //condition satisfied, so forward message to consumer
                if (!destIn.poll(m -> {
                    String newPayload = ((String) m.getPayload());
                    destOut.send(new GenericMessage<>(newPayload));
                })) {
                    Thread.sleep(1000);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    };
}

但这会引发以下异常:-

Parameter 1 of method poller in com.MainApplication required a single bean, but 2 were found:
    - nullChannel: defined in null
    - errorChannel: defined in null

如果有人能在这里帮助我或指出我的工作示例,我将不胜感激。

Spring开机版本:2.6.4, Spring云版本:2021.0.1

你为什么不直接将 StreamBridge 注入你的运行器而不是消息通道?

默认情况下,创建流桥输出通道on-demand(第一次发送)。