尚未在 Reactor Kafka 和 Spring 集成中创建任何订阅
No subscriptions have been created in Reactor Kafka and Spring Integration
我正在尝试使用 Spring Integration 和 Project Reactor 创建一个简单的流程,我在其中使用 Reactor Kafka 使用记录,将它们传递到一个通道,从那里它将使用 Reactor 将消息生成到另一个主题卡夫卡
消费流量为:
@Service
public class ReactiveConsumerService {
public ReactiveKafkaConsumerTemplate<String, String> reactiveKafkaConsumerTemplate;
@Qualifier("directChannel")
@Autowired
public MessageChannel directChannel;
public ReactiveConsumerService(ReactiveKafkaConsumerTemplate<String, String> reactiveKafkaConsumerTemplate) {
this.reactiveKafkaConsumerTemplate = reactiveKafkaConsumerTemplate;
}
@Bean
public IntegrationFlow readFromKafka() {
return IntegrationFlows.from(reactiveKafkaConsumerTemplate.receiveAutoAck()
.map(GenericMessage::new))
.<ConsumerRecord<String, String>, String>transform(ConsumerRecord::value)
.<String, String>transform(String::toUpperCase)
.channel(directChannel)
.get();
}
}
生产流程为:
@Service
public class ReactiveProducerService {
private final ReactiveKafkaProducerTemplate<String, String> reactiveKafkaProducerTemplate;
@Qualifier("directChannel")
@Autowired
public MessageChannel directChannel;
public ReactiveProducerService(ReactiveKafkaProducerTemplate<String, String> reactiveKafkaProducerTemplate) {
this.reactiveKafkaProducerTemplate = reactiveKafkaProducerTemplate;
}
@Bean
public IntegrationFlow kafkaProducerFlow() {
return IntegrationFlows.from(directChannel)
.handle(s -> reactiveKafkaProducerTemplate.send("topic2", s.getPayload().toString()))
.get();
}
}
我想知道我应该如何以及在哪里执行订阅。
编辑:
我添加了一个 .subscripe()
但它仍然不起作用:
2022-01-25 20:36:59.570 INFO 1804 --- [ration-sample-1] o.a.kafka.common.utils.AppInfoParser : App info kafka.consumer for consumer-reactive-kafka-spring-integration-sample-1 unregistered
2022-01-25 20:36:59.573 ERROR 1804 --- [oundedElastic-1] reactor.core.publisher.Operators : Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: No subscriptions have been created
Caused by: java.lang.IllegalStateException: No subscriptions have been created
at reactor.kafka.receiver.ReceiverOptions.subscriber(ReceiverOptions.java:423) ~[reactor-kafka-1.3.9.jar:1.3.9]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.FluxPeekFuseable] :
reactor.core.publisher.Flux.doOnRequest
Caused by: java.lang.IllegalStateException: No subscriptions have been created
reactor.kafka.receiver.internals.ConsumerHandler.receive(ConsumerHandler.java:110)
Error has been observed at the following site(s):
*________Flux.doOnRequest ? at reactor.kafka.receiver.internals.ConsumerHandler.receive(ConsumerHandler.java:110)
|_ Flux.filter ? at reactor.kafka.receiver.internals.DefaultKafkaReceiver.lambda$receiveAutoAck(DefaultKafkaReceiver.java:70)
|_ Flux.publishOn ? at reactor.kafka.receiver.internals.DefaultKafkaReceiver.lambda$receiveAutoAck(DefaultKafkaReceiver.java:71)
|_ Flux.map ? at reactor.kafka.receiver.internals.DefaultKafkaReceiver.lambda$receiveAutoAck(DefaultKafkaReceiver.java:72)
*______________Flux.using ? at reactor.kafka.receiver.internals.DefaultKafkaReceiver.lambda$withHandler(DefaultKafkaReceiver.java:137)
*__________Flux.usingWhen ? at reactor.kafka.receiver.internals.DefaultKafkaReceiver.withHandler(DefaultKafkaReceiver.java:129)
|_ ? at reactor.kafka.receiver.internals.DefaultKafkaReceiver.receiveAutoAck(DefaultKafkaReceiver.java:68)
|_ ? at reactor.kafka.receiver.KafkaReceiver.receiveAutoAck(KafkaReceiver.java:124)
|_ Flux.concatMap ? at org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate.receiveAutoAck(ReactiveKafkaConsumerTemplate.java:69)
|_ Flux.map ? at reactor.kafka.spring.integration.samples.service.ReactiveConsumerService.readFromKafka(ReactiveConsumerService.java:38)
|_ Flux.from ? at org.springframework.integration.channel.FluxMessageChannel.subscribeTo(FluxMessageChannel.java:118)
|_ Flux.delaySubscription ? at org.springframework.integration.channel.FluxMessageChannel.subscribeTo(FluxMessageChannel.java:119)
|_ Flux.publishOn ? at org.springframework.integration.channel.FluxMessageChannel.subscribeTo(FluxMessageChannel.java:120)
|_ Flux.doOnNext ? at org.springframework.integration.channel.FluxMessageChannel.subscribeTo(FluxMessageChannel.java:121)
Original Stack Trace:
at reactor.kafka.receiver.ReceiverOptions.subscriber(ReceiverOptions.java:423) ~[reactor-kafka-1.3.9.jar:1.3.9]
at reactor.kafka.receiver.internals.ConsumerEventLoop$SubscribeEvent.run(ConsumerEventLoop.java:207) ~[reactor-kafka-1.3.9.jar:1.3.9]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) ~[reactor-core-3.4.14.jar:3.4.14]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) ~[reactor-core-3.4.14.jar:3.4.14]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
2022-01-25 20:36:59.772 INFO 1804 --- [ main] o.s.b.web.embedded.netty.NettyWebServer : Netty started on port 8090
2022-01-25 20:36:59.853 INFO 1804 --- [ main] o.a.c.impl.engine.AbstractCamelContext : Routes startup summary (total:0 started:0)
2022-01-25 20:36:59.853 INFO 1804 --- [ main] o.a.c.impl.engine.AbstractCamelContext : Apache Camel 3.12.0 (camel-1) started in 149ms (build:84ms init:59ms start:6ms)
2022-01-25 20:36:59.866 INFO 1804 --- [ main] ReactorKafkaSpringIntegrationApplication : Started ReactorKafkaSpringIntegrationApplication in 4.246 seconds (JVM running for 4.616)
示例代码:
@Service
public class ReactiveProducerService {
private final ReactiveKafkaProducerTemplate<String, String> reactiveKafkaProducerTemplate;
@Qualifier("directChannel")
@Autowired
public MessageChannel directChannel;
public ReactiveProducerService(ReactiveKafkaProducerTemplate<String, String> reactiveKafkaProducerTemplate) {
this.reactiveKafkaProducerTemplate = reactiveKafkaProducerTemplate;
}
@Bean
public IntegrationFlow kafkaProducerFlow() {
return IntegrationFlows.from(directChannel)
.handle(s -> reactiveKafkaProducerTemplate.send("topic2", s.getPayload().toString()).subscribe(System.out::println))
.get();
}
}
当 .<ConsumerRecord<String, String>, String>transform(ConsumerRecord::value)
的端点由应用程序上下文自动启动时,对 reactiveKafkaConsumerTemplate
的订阅会立即发生。
将此作为备选方案查看:
/**
* Represent an Integration Flow as a Reactive Streams {@link Publisher} bean.
* @param autoStartOnSubscribe start message production and consumption in the flow,
* when a subscription to the publisher is initiated.
* If this set to true, the flow is marked to not start automatically by the application context.
* @param <T> the expected {@code payload} type
* @return the Reactive Streams {@link Publisher}
* @since 5.5.6
*/
@SuppressWarnings(UNCHECKED)
protected <T> Publisher<Message<T>> toReactivePublisher(boolean autoStartOnSubscribe) {
虽然我认为你指的是出站端的订阅。从你的问题中不清楚,但是 reactiveKafkaProducerTemplate
有一个像这样的合同:
public Mono<SenderResult<Void>> send(String topic, V value) {
因此,您需要订阅返回的 Mono
以启动流程。
注意:你也把 send()
的论点弄乱了。你不是这个意思吗:reactiveKafkaProducerTemplate.send("test", "topic2")
?
要让它订阅那个 Mono
,你只需要在 handle()
:
中自己做
.handle(s -> reactiveKafkaProducerTemplate.send("topic2", "test").subscribe())
更新 2
reactor.kafka.receiver.ReceiverOptions.subscriber()
中的 java.lang.IllegalStateException: No subscriptions have been created
之类的错误意味着您没有分配要收听的主题、模式或分区。
参见 ReceiverOptions.subscription()
或 ReceiverOptions.assignment()
。
我正在尝试使用 Spring Integration 和 Project Reactor 创建一个简单的流程,我在其中使用 Reactor Kafka 使用记录,将它们传递到一个通道,从那里它将使用 Reactor 将消息生成到另一个主题卡夫卡
消费流量为:
@Service
public class ReactiveConsumerService {
public ReactiveKafkaConsumerTemplate<String, String> reactiveKafkaConsumerTemplate;
@Qualifier("directChannel")
@Autowired
public MessageChannel directChannel;
public ReactiveConsumerService(ReactiveKafkaConsumerTemplate<String, String> reactiveKafkaConsumerTemplate) {
this.reactiveKafkaConsumerTemplate = reactiveKafkaConsumerTemplate;
}
@Bean
public IntegrationFlow readFromKafka() {
return IntegrationFlows.from(reactiveKafkaConsumerTemplate.receiveAutoAck()
.map(GenericMessage::new))
.<ConsumerRecord<String, String>, String>transform(ConsumerRecord::value)
.<String, String>transform(String::toUpperCase)
.channel(directChannel)
.get();
}
}
生产流程为:
@Service
public class ReactiveProducerService {
private final ReactiveKafkaProducerTemplate<String, String> reactiveKafkaProducerTemplate;
@Qualifier("directChannel")
@Autowired
public MessageChannel directChannel;
public ReactiveProducerService(ReactiveKafkaProducerTemplate<String, String> reactiveKafkaProducerTemplate) {
this.reactiveKafkaProducerTemplate = reactiveKafkaProducerTemplate;
}
@Bean
public IntegrationFlow kafkaProducerFlow() {
return IntegrationFlows.from(directChannel)
.handle(s -> reactiveKafkaProducerTemplate.send("topic2", s.getPayload().toString()))
.get();
}
}
我想知道我应该如何以及在哪里执行订阅。
编辑:
我添加了一个 .subscripe()
但它仍然不起作用:
2022-01-25 20:36:59.570 INFO 1804 --- [ration-sample-1] o.a.kafka.common.utils.AppInfoParser : App info kafka.consumer for consumer-reactive-kafka-spring-integration-sample-1 unregistered
2022-01-25 20:36:59.573 ERROR 1804 --- [oundedElastic-1] reactor.core.publisher.Operators : Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: No subscriptions have been created
Caused by: java.lang.IllegalStateException: No subscriptions have been created
at reactor.kafka.receiver.ReceiverOptions.subscriber(ReceiverOptions.java:423) ~[reactor-kafka-1.3.9.jar:1.3.9]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.FluxPeekFuseable] :
reactor.core.publisher.Flux.doOnRequest
Caused by: java.lang.IllegalStateException: No subscriptions have been created
reactor.kafka.receiver.internals.ConsumerHandler.receive(ConsumerHandler.java:110)
Error has been observed at the following site(s):
*________Flux.doOnRequest ? at reactor.kafka.receiver.internals.ConsumerHandler.receive(ConsumerHandler.java:110)
|_ Flux.filter ? at reactor.kafka.receiver.internals.DefaultKafkaReceiver.lambda$receiveAutoAck(DefaultKafkaReceiver.java:70)
|_ Flux.publishOn ? at reactor.kafka.receiver.internals.DefaultKafkaReceiver.lambda$receiveAutoAck(DefaultKafkaReceiver.java:71)
|_ Flux.map ? at reactor.kafka.receiver.internals.DefaultKafkaReceiver.lambda$receiveAutoAck(DefaultKafkaReceiver.java:72)
*______________Flux.using ? at reactor.kafka.receiver.internals.DefaultKafkaReceiver.lambda$withHandler(DefaultKafkaReceiver.java:137)
*__________Flux.usingWhen ? at reactor.kafka.receiver.internals.DefaultKafkaReceiver.withHandler(DefaultKafkaReceiver.java:129)
|_ ? at reactor.kafka.receiver.internals.DefaultKafkaReceiver.receiveAutoAck(DefaultKafkaReceiver.java:68)
|_ ? at reactor.kafka.receiver.KafkaReceiver.receiveAutoAck(KafkaReceiver.java:124)
|_ Flux.concatMap ? at org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate.receiveAutoAck(ReactiveKafkaConsumerTemplate.java:69)
|_ Flux.map ? at reactor.kafka.spring.integration.samples.service.ReactiveConsumerService.readFromKafka(ReactiveConsumerService.java:38)
|_ Flux.from ? at org.springframework.integration.channel.FluxMessageChannel.subscribeTo(FluxMessageChannel.java:118)
|_ Flux.delaySubscription ? at org.springframework.integration.channel.FluxMessageChannel.subscribeTo(FluxMessageChannel.java:119)
|_ Flux.publishOn ? at org.springframework.integration.channel.FluxMessageChannel.subscribeTo(FluxMessageChannel.java:120)
|_ Flux.doOnNext ? at org.springframework.integration.channel.FluxMessageChannel.subscribeTo(FluxMessageChannel.java:121)
Original Stack Trace:
at reactor.kafka.receiver.ReceiverOptions.subscriber(ReceiverOptions.java:423) ~[reactor-kafka-1.3.9.jar:1.3.9]
at reactor.kafka.receiver.internals.ConsumerEventLoop$SubscribeEvent.run(ConsumerEventLoop.java:207) ~[reactor-kafka-1.3.9.jar:1.3.9]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) ~[reactor-core-3.4.14.jar:3.4.14]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) ~[reactor-core-3.4.14.jar:3.4.14]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
2022-01-25 20:36:59.772 INFO 1804 --- [ main] o.s.b.web.embedded.netty.NettyWebServer : Netty started on port 8090
2022-01-25 20:36:59.853 INFO 1804 --- [ main] o.a.c.impl.engine.AbstractCamelContext : Routes startup summary (total:0 started:0)
2022-01-25 20:36:59.853 INFO 1804 --- [ main] o.a.c.impl.engine.AbstractCamelContext : Apache Camel 3.12.0 (camel-1) started in 149ms (build:84ms init:59ms start:6ms)
2022-01-25 20:36:59.866 INFO 1804 --- [ main] ReactorKafkaSpringIntegrationApplication : Started ReactorKafkaSpringIntegrationApplication in 4.246 seconds (JVM running for 4.616)
示例代码:
@Service
public class ReactiveProducerService {
private final ReactiveKafkaProducerTemplate<String, String> reactiveKafkaProducerTemplate;
@Qualifier("directChannel")
@Autowired
public MessageChannel directChannel;
public ReactiveProducerService(ReactiveKafkaProducerTemplate<String, String> reactiveKafkaProducerTemplate) {
this.reactiveKafkaProducerTemplate = reactiveKafkaProducerTemplate;
}
@Bean
public IntegrationFlow kafkaProducerFlow() {
return IntegrationFlows.from(directChannel)
.handle(s -> reactiveKafkaProducerTemplate.send("topic2", s.getPayload().toString()).subscribe(System.out::println))
.get();
}
}
当 .<ConsumerRecord<String, String>, String>transform(ConsumerRecord::value)
的端点由应用程序上下文自动启动时,对 reactiveKafkaConsumerTemplate
的订阅会立即发生。
将此作为备选方案查看:
/**
* Represent an Integration Flow as a Reactive Streams {@link Publisher} bean.
* @param autoStartOnSubscribe start message production and consumption in the flow,
* when a subscription to the publisher is initiated.
* If this set to true, the flow is marked to not start automatically by the application context.
* @param <T> the expected {@code payload} type
* @return the Reactive Streams {@link Publisher}
* @since 5.5.6
*/
@SuppressWarnings(UNCHECKED)
protected <T> Publisher<Message<T>> toReactivePublisher(boolean autoStartOnSubscribe) {
虽然我认为你指的是出站端的订阅。从你的问题中不清楚,但是 reactiveKafkaProducerTemplate
有一个像这样的合同:
public Mono<SenderResult<Void>> send(String topic, V value) {
因此,您需要订阅返回的 Mono
以启动流程。
注意:你也把 send()
的论点弄乱了。你不是这个意思吗:reactiveKafkaProducerTemplate.send("test", "topic2")
?
要让它订阅那个 Mono
,你只需要在 handle()
:
.handle(s -> reactiveKafkaProducerTemplate.send("topic2", "test").subscribe())
更新 2
reactor.kafka.receiver.ReceiverOptions.subscriber()
中的 java.lang.IllegalStateException: No subscriptions have been created
之类的错误意味着您没有分配要收听的主题、模式或分区。
参见 ReceiverOptions.subscription()
或 ReceiverOptions.assignment()
。