无法将回调 lambda 传递到 .toReactivePublisher().subscribe() Spring 与 Project Reactor 的集成
Cannot pass callback lambdas into .toReactivePublisher().subscribe() in Spring Integration with Project Reactor
当我使用 Spring 与 Project Reactor 的集成时,我收到 Caused by: java.lang.IllegalStateException: No subscriptions have been created
错误,我想知道如何订阅。我的原始代码是:
@Bean
public IntegrationFlow writeToKafka() {
return IntegrationFlows.from(reactiveKafkaConsumerTemplate.receiveAutoAck()
.map(payload -> {
return new GenericMessage<ConsumerRecord<String, String>>(payload);
}))
.<ConsumerRecord<String, String>, String>transform(ConsumerRecord::value)
.channel(c -> c.queue("resultChannel"))
.get();
}
在它抛出错误后,我尝试订阅,但我不明白我应该将什么传递给订阅方法,这似乎与常规反应式 .subscribe()
.
@Bean
public void writeToKafka() {
return IntegrationFlows.from(reactiveKafkaConsumerTemplate.receiveAutoAck()
.map(payload -> {
return new GenericMessage<ConsumerRecord<String, String>>(payload);
}))
.<ConsumerRecord<String, String>, String>transform(ConsumerRecord::value)
.channel(c -> c.queue("resultChannel"))
.toReactivePublisher().subscribe(value -> {
log.info("Wrote: " + value);
});
}
这样做 .toReactivePublisher().subscribe()
组合不正确。 IntegrationFlow
必须首先公开并配置为 bean。只有这样,在你的服务中的某处注入这个 bean 之后,你才能 subscribe()
那个 Publisher
bean。
您忽略了一个事实,即必须首先在其依赖注入容器中初始化控制反转,然后我们才能对这些 bean 进行一些实际工作(订阅)。
编辑
例如我的test-case:
@SpringJUnitConfig
@DirtiesContext
public class ReactiveStreamsTests {
@Autowired
@Qualifier("pollableReactiveFlow")
private Publisher<Message<Integer>> pollablePublisher;
@Autowired
private AbstractEndpoint reactiveTransformer;
@Autowired
@Qualifier("inputChannel")
private MessageChannel inputChannel;
@Test
void testPollableReactiveFlow() throws Exception {
assertThat(this.reactiveTransformer).isInstanceOf(ReactiveStreamsConsumer.class);
this.inputChannel.send(new GenericMessage<>("1,2,3,4,5"));
CountDownLatch latch = new CountDownLatch(6);
Flux.from(this.pollablePublisher)
.take(6)
.filter(m -> m.getHeaders().containsKey(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
.doOnNext(p -> latch.countDown())
.subscribe();
ExecutorService exec = Executors.newSingleThreadExecutor();
Future<List<Integer>> future =
exec.submit(() ->
Flux.just("11,12,13")
.map(v -> v.split(","))
.flatMapIterable(Arrays::asList)
.map(Integer::parseInt)
.<Message<Integer>>map(GenericMessage::new)
.concatWith(this.pollablePublisher)
.take(7)
.map(Message::getPayload)
.collectList()
.block(Duration.ofSeconds(10))
);
this.inputChannel.send(new GenericMessage<>("6,7,8,9,10"));
assertThat(latch.await(20, TimeUnit.SECONDS)).isTrue();
List<Integer> integers = future.get(20, TimeUnit.SECONDS);
assertThat(integers).isNotNull();
assertThat(integers.size()).isEqualTo(7);
exec.shutdownNow();
}
@Configuration
@EnableIntegration
public static class ContextConfiguration {
@Bean
public Publisher<Message<Integer>> pollableReactiveFlow() {
return IntegrationFlows
.from("inputChannel")
.split(s -> s.delimiters(","))
.<String, Integer>transform(Integer::parseInt,
e -> e.reactive(flux -> flux.publishOn(Schedulers.parallel())).id("reactiveTransformer"))
.channel(MessageChannels.queue())
.log()
.toReactivePublisher();
}
}
}
当我使用 Spring 与 Project Reactor 的集成时,我收到 Caused by: java.lang.IllegalStateException: No subscriptions have been created
错误,我想知道如何订阅。我的原始代码是:
@Bean
public IntegrationFlow writeToKafka() {
return IntegrationFlows.from(reactiveKafkaConsumerTemplate.receiveAutoAck()
.map(payload -> {
return new GenericMessage<ConsumerRecord<String, String>>(payload);
}))
.<ConsumerRecord<String, String>, String>transform(ConsumerRecord::value)
.channel(c -> c.queue("resultChannel"))
.get();
}
在它抛出错误后,我尝试订阅,但我不明白我应该将什么传递给订阅方法,这似乎与常规反应式 .subscribe()
.
@Bean
public void writeToKafka() {
return IntegrationFlows.from(reactiveKafkaConsumerTemplate.receiveAutoAck()
.map(payload -> {
return new GenericMessage<ConsumerRecord<String, String>>(payload);
}))
.<ConsumerRecord<String, String>, String>transform(ConsumerRecord::value)
.channel(c -> c.queue("resultChannel"))
.toReactivePublisher().subscribe(value -> {
log.info("Wrote: " + value);
});
}
这样做 .toReactivePublisher().subscribe()
组合不正确。 IntegrationFlow
必须首先公开并配置为 bean。只有这样,在你的服务中的某处注入这个 bean 之后,你才能 subscribe()
那个 Publisher
bean。
您忽略了一个事实,即必须首先在其依赖注入容器中初始化控制反转,然后我们才能对这些 bean 进行一些实际工作(订阅)。
编辑
例如我的test-case:
@SpringJUnitConfig
@DirtiesContext
public class ReactiveStreamsTests {
@Autowired
@Qualifier("pollableReactiveFlow")
private Publisher<Message<Integer>> pollablePublisher;
@Autowired
private AbstractEndpoint reactiveTransformer;
@Autowired
@Qualifier("inputChannel")
private MessageChannel inputChannel;
@Test
void testPollableReactiveFlow() throws Exception {
assertThat(this.reactiveTransformer).isInstanceOf(ReactiveStreamsConsumer.class);
this.inputChannel.send(new GenericMessage<>("1,2,3,4,5"));
CountDownLatch latch = new CountDownLatch(6);
Flux.from(this.pollablePublisher)
.take(6)
.filter(m -> m.getHeaders().containsKey(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
.doOnNext(p -> latch.countDown())
.subscribe();
ExecutorService exec = Executors.newSingleThreadExecutor();
Future<List<Integer>> future =
exec.submit(() ->
Flux.just("11,12,13")
.map(v -> v.split(","))
.flatMapIterable(Arrays::asList)
.map(Integer::parseInt)
.<Message<Integer>>map(GenericMessage::new)
.concatWith(this.pollablePublisher)
.take(7)
.map(Message::getPayload)
.collectList()
.block(Duration.ofSeconds(10))
);
this.inputChannel.send(new GenericMessage<>("6,7,8,9,10"));
assertThat(latch.await(20, TimeUnit.SECONDS)).isTrue();
List<Integer> integers = future.get(20, TimeUnit.SECONDS);
assertThat(integers).isNotNull();
assertThat(integers.size()).isEqualTo(7);
exec.shutdownNow();
}
@Configuration
@EnableIntegration
public static class ContextConfiguration {
@Bean
public Publisher<Message<Integer>> pollableReactiveFlow() {
return IntegrationFlows
.from("inputChannel")
.split(s -> s.delimiters(","))
.<String, Integer>transform(Integer::parseInt,
e -> e.reactive(flux -> flux.publishOn(Schedulers.parallel())).id("reactiveTransformer"))
.channel(MessageChannels.queue())
.log()
.toReactivePublisher();
}
}
}