无法将回调 lambda 传递到 .toReactivePublisher().subscribe() Spring 与 Project Reactor 的集成

Cannot pass callback lambdas into .toReactivePublisher().subscribe() in Spring Integration with Project Reactor

当我使用 Spring 与 Project Reactor 的集成时,我收到 Caused by: java.lang.IllegalStateException: No subscriptions have been created 错误,我想知道如何订阅。我的原始代码是:

    @Bean
    public IntegrationFlow writeToKafka() {
        return IntegrationFlows.from(reactiveKafkaConsumerTemplate.receiveAutoAck()
                .map(payload -> {
                    return new GenericMessage<ConsumerRecord<String, String>>(payload);
                }))
            .<ConsumerRecord<String, String>, String>transform(ConsumerRecord::value)
            .channel(c -> c.queue("resultChannel"))
            .get();
    }

在它抛出错误后,我尝试订阅,但我不明白我应该将什么传递给订阅方法,这似乎与常规反应式 .subscribe().

    @Bean
    public void writeToKafka() {
        return IntegrationFlows.from(reactiveKafkaConsumerTemplate.receiveAutoAck()
                .map(payload -> {
                    return new GenericMessage<ConsumerRecord<String, String>>(payload);
                }))
            .<ConsumerRecord<String, String>, String>transform(ConsumerRecord::value)
            .channel(c -> c.queue("resultChannel"))
            .toReactivePublisher().subscribe(value -> {
                log.info("Wrote: " + value);
            });
    }

这样做 .toReactivePublisher().subscribe() 组合不正确。 IntegrationFlow 必须首先公开并配置为 bean。只有这样,在你的服务中的某处注入这个 bean 之后,你才能 subscribe() 那个 Publisher bean。

您忽略了一个事实,即必须首先在其依赖注入容器中初始化控制反转,然后我们才能对这些 bean 进行一些实际工作(订阅)。

编辑

例如我的test-case:

@SpringJUnitConfig
@DirtiesContext
public class ReactiveStreamsTests {

    @Autowired
    @Qualifier("pollableReactiveFlow")
    private Publisher<Message<Integer>> pollablePublisher;

    @Autowired
    private AbstractEndpoint reactiveTransformer;

    @Autowired
    @Qualifier("inputChannel")
    private MessageChannel inputChannel;

    @Test
    void testPollableReactiveFlow() throws Exception {
        assertThat(this.reactiveTransformer).isInstanceOf(ReactiveStreamsConsumer.class);
        this.inputChannel.send(new GenericMessage<>("1,2,3,4,5"));

        CountDownLatch latch = new CountDownLatch(6);

        Flux.from(this.pollablePublisher)
                .take(6)
                .filter(m -> m.getHeaders().containsKey(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
                .doOnNext(p -> latch.countDown())
                .subscribe();

        ExecutorService exec = Executors.newSingleThreadExecutor();
        Future<List<Integer>> future =
                exec.submit(() ->
                        Flux.just("11,12,13")
                                .map(v -> v.split(","))
                                .flatMapIterable(Arrays::asList)
                                .map(Integer::parseInt)
                                .<Message<Integer>>map(GenericMessage::new)
                                .concatWith(this.pollablePublisher)
                                .take(7)
                                .map(Message::getPayload)
                                .collectList()
                                .block(Duration.ofSeconds(10))
                );

        this.inputChannel.send(new GenericMessage<>("6,7,8,9,10"));

        assertThat(latch.await(20, TimeUnit.SECONDS)).isTrue();
        List<Integer> integers = future.get(20, TimeUnit.SECONDS);

        assertThat(integers).isNotNull();
        assertThat(integers.size()).isEqualTo(7);
        exec.shutdownNow();
    }

    @Configuration
    @EnableIntegration
    public static class ContextConfiguration {

        @Bean
        public Publisher<Message<Integer>> pollableReactiveFlow() {
            return IntegrationFlows
                    .from("inputChannel")
                    .split(s -> s.delimiters(","))
                    .<String, Integer>transform(Integer::parseInt,
                            e -> e.reactive(flux -> flux.publishOn(Schedulers.parallel())).id("reactiveTransformer"))
                    .channel(MessageChannels.queue())
                    .log()
                    .toReactivePublisher();
        }

    }

}