Reactor Kafka 中没有创建订阅错误
No subscriptions have been created error in Reactor Kafka
消费者配置文件:
在这里,我对键和值都使用了 StringDeserializers。并且已经订阅了一个主题。
@Bean("errorReceiver")
public ReceiverOptions<Object, String> errorConsumerConfig() {
Map<String, Object> errorConsumerProps = new HashMap<>();
errorConsumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, errorBootstrapServers);
errorConsumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "error-consumer");
errorConsumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "error-consumer-1");
errorConsumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
errorConsumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
ReceiverOptions<Object, String> errorReceiverOptions = ReceiverOptions.create(errorConsumerProps);
errorReceiverOptions.subscription(Collections.singleton("order_topic"))
.addAssignListener(partitions -> log.info("onPartitionsAssigned : {}", partitions))
.addRevokeListener(partitions -> log.info("onPartitionsRevoked : {}", partitions));
return errorReceiverOptions;
}
}
消费者代码:
我的登录消费者代码将订阅的主题打印为空。 AppUtility 正在将数据转换为字符串。
@Autowired
@Qualifier("errorReceiver")
private ReceiverOptions<Object, String> errorReceiverOptions;
@EventListener(ApplicationStartedEvent.class)
public Disposable getErrorsTopic() {
Flux<ReceiverRecord<Object, Object>> kafkaFlux = KafkaReceiver.create(errorReceiverOptions).receive();
log.info("subs topics : {}", errorReceiverOptions.subscriptionTopics());
return kafkaFlux.log()
.doOnNext(AppUtility::toBinary)
.doOnError(error -> log.error("error ocurred", error))
.subscribe();
}
日志:
java.lang.IllegalStateException: No subscriptions have been created
at reactor.kafka.receiver.ReceiverOptions.subscriber(ReceiverOptions.java:385) ~[reactor-kafka-1.3.4.jar:1.3.4]
at reactor.kafka.receiver.internals.ConsumerEventLoop$SubscribeEvent.run(ConsumerEventLoop.java:187) ~[reactor-kafka-1.3.4.jar:1.3.4]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) [reactor-core-3.4.6.jar:3.4.6]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) [reactor-core-3.4.6.jar:3.4.6]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_261]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_261]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [na:1.8.0_261]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_261]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_261]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_261]
pom 导入:
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
</dependency>
有人遇到过这样的问题吗?我无法解决这个问题。
您好,请查看下面的示例,根据示例,您缺少类似下面代码的内容
return kafkaFlux.subscribe(record -> {
ReceiverOffset offset = record.receiverOffset();
System.out.printf("Received message: topic-partition=%s offset=%d timestamp=%s key=%d value=%s\n",
offset.topicPartition(),
offset.offset(),
dateFormat.format(new Date(record.timestamp())),
record.key(),
record.value());
offset.acknowledge();
latch.countDown();
});
我忘记了反应式编程中不变性的主要概念。通过将选项分配给另一个选项对象(配置代码中的倒数第四行)解决了这个问题。
public ReceiverOptions<Object, String> errorConsumerConfig() {
Map<String, Object> errorConsumerProps = new HashMap<>();
errorConsumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, errorBootstrapServers);
errorConsumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "error-consumer");
errorConsumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "error-consumer-1");
errorConsumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
errorConsumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
ReceiverOptions<Object, String> errorReceiverOptions = ReceiverOptions.create(errorConsumerProps);
ReceiverOptions<Object, String> options = errorReceiverOptions.subscription(Collections.singleton("order_topic")) // setting the subscription doesn't work unless assigned to an object, reason being immutability
.addAssignListener(partitions -> log.debug("onPartitionsAssigned : {}", partitions))
.addRevokeListener(partitions -> log.debug("onPartitionsRevoked : {}", partitions));
return options; }
编辑:尝试在您的侦听器代码中设置订阅主题。
消费者配置文件: 在这里,我对键和值都使用了 StringDeserializers。并且已经订阅了一个主题。
@Bean("errorReceiver")
public ReceiverOptions<Object, String> errorConsumerConfig() {
Map<String, Object> errorConsumerProps = new HashMap<>();
errorConsumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, errorBootstrapServers);
errorConsumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "error-consumer");
errorConsumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "error-consumer-1");
errorConsumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
errorConsumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
ReceiverOptions<Object, String> errorReceiverOptions = ReceiverOptions.create(errorConsumerProps);
errorReceiverOptions.subscription(Collections.singleton("order_topic"))
.addAssignListener(partitions -> log.info("onPartitionsAssigned : {}", partitions))
.addRevokeListener(partitions -> log.info("onPartitionsRevoked : {}", partitions));
return errorReceiverOptions;
}
}
消费者代码: 我的登录消费者代码将订阅的主题打印为空。 AppUtility 正在将数据转换为字符串。
@Autowired
@Qualifier("errorReceiver")
private ReceiverOptions<Object, String> errorReceiverOptions;
@EventListener(ApplicationStartedEvent.class)
public Disposable getErrorsTopic() {
Flux<ReceiverRecord<Object, Object>> kafkaFlux = KafkaReceiver.create(errorReceiverOptions).receive();
log.info("subs topics : {}", errorReceiverOptions.subscriptionTopics());
return kafkaFlux.log()
.doOnNext(AppUtility::toBinary)
.doOnError(error -> log.error("error ocurred", error))
.subscribe();
}
日志:
java.lang.IllegalStateException: No subscriptions have been created
at reactor.kafka.receiver.ReceiverOptions.subscriber(ReceiverOptions.java:385) ~[reactor-kafka-1.3.4.jar:1.3.4]
at reactor.kafka.receiver.internals.ConsumerEventLoop$SubscribeEvent.run(ConsumerEventLoop.java:187) ~[reactor-kafka-1.3.4.jar:1.3.4]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) [reactor-core-3.4.6.jar:3.4.6]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) [reactor-core-3.4.6.jar:3.4.6]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_261]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_261]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [na:1.8.0_261]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_261]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_261]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_261]
pom 导入:
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
</dependency>
有人遇到过这样的问题吗?我无法解决这个问题。
您好,请查看下面的示例,根据示例,您缺少类似下面代码的内容
return kafkaFlux.subscribe(record -> {
ReceiverOffset offset = record.receiverOffset();
System.out.printf("Received message: topic-partition=%s offset=%d timestamp=%s key=%d value=%s\n",
offset.topicPartition(),
offset.offset(),
dateFormat.format(new Date(record.timestamp())),
record.key(),
record.value());
offset.acknowledge();
latch.countDown();
});
我忘记了反应式编程中不变性的主要概念。通过将选项分配给另一个选项对象(配置代码中的倒数第四行)解决了这个问题。
public ReceiverOptions<Object, String> errorConsumerConfig() {
Map<String, Object> errorConsumerProps = new HashMap<>();
errorConsumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, errorBootstrapServers);
errorConsumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "error-consumer");
errorConsumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "error-consumer-1");
errorConsumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
errorConsumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
ReceiverOptions<Object, String> errorReceiverOptions = ReceiverOptions.create(errorConsumerProps);
ReceiverOptions<Object, String> options = errorReceiverOptions.subscription(Collections.singleton("order_topic")) // setting the subscription doesn't work unless assigned to an object, reason being immutability
.addAssignListener(partitions -> log.debug("onPartitionsAssigned : {}", partitions))
.addRevokeListener(partitions -> log.debug("onPartitionsRevoked : {}", partitions));
return options; }
编辑:尝试在您的侦听器代码中设置订阅主题。