多个 StreamListeners 到同一主题,Spring Cloud Stream 连接到 Kafka
Multiple StreamListeners to same Topic with Spring Cloud Stream connected to Kafka
我有 Spring 启动应用程序 我正在使用 Spring Cloud Stream 连接到 Kafka。我正在尝试为同一个 kafka 主题设置两个单独的流侦听器方法。
@StreamListener("countries")
@SendTo("aggregated-statistic")
public KStream<?, AggregatedCountry> process(KStream<Object, Country> input) {
return input
.groupBy((key, value) -> value.getCountryCode())
.aggregate(this::initialize,
this::aggregateAmount,
materializedAsPersistentStore("countries", Serdes.String(),
Serdes.serdeFrom(new JsonSerializer<>(),
new JsonDeserializer<>(AggregatedCountry.class))))
.toStream()
.map((key, value) -> new KeyValue<>(null, value));
}
@StreamListener("countries")
@SendTo("daily-statistic")
public KStream<?, List<DailyStatistics>> daily(KStream<Object, Country> input) {
return input
.groupBy((key, value) -> value.getCountryCode())
.aggregate(this::initializeDailyStatistics,
this::dailyStatistics,
materializedAsPersistentStore("daily", Serdes.String(),
Serdes.serdeFrom(new JsonSerializer<>(),
new JsonDeserializer<>(List.class))))
.toStream()
.map((key, value) -> new KeyValue<>(null, value));
}
但是当我启动 Spring 启动应用程序时,我得到了这个错误。
Exception in thread "kafka-stream-f4f8166b-cbeb-42ca-b461-2b3a23885a5d-StreamThread-1" java.lang.IllegalStateException: Consumer was assigned partitions [kafka-stream-daily-repartition-0] which didn't correspond to subscription request [kafka-stream-countries-repartition, countries]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.handleAssignmentMismatch(ConsumerCoordinator.java:218)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:264)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:424)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:353)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1251)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:963)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:863)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)
我想每个 StreamListener 方法都需要单独的应用程序 ID,但是如果我正在收听相同的主题,我该如何在 application.yml 文件中配置它?
您需要提供两个单独的输入绑定(并且这两个绑定可以指向同一主题)。您不能在多个 StreamListener
上使用相同的绑定名称。然后,您可以在输入绑定上为多个基于 StreamListener
的处理器设置 application.id
。例如
spring.cloud.stream.kafka.streams.bindings.countries1.consumer.applicationId
和
spring.cloud.stream.kafka.streams.bindings.countries2.consumer.applicationId
请参阅参考文档中的 this section。
您正在阅读“国家”主题两次,如果您从“国家”阅读一次,并将数据发送到“daily-statistic”和“aggregated-statistic”会更好。
读取两次和并发处理不是一回事。如果你想并发配置这个参数:
spring:
cloud.stream:
bindings:
countries:
destination: countries-topic
consumer.concurrency: 6
您可以使用如下拓扑:
@StreamListener("countries")
@SendTo({"daily-statistic", "aggregated-statistic"})
我有 Spring 启动应用程序 我正在使用 Spring Cloud Stream 连接到 Kafka。我正在尝试为同一个 kafka 主题设置两个单独的流侦听器方法。
@StreamListener("countries")
@SendTo("aggregated-statistic")
public KStream<?, AggregatedCountry> process(KStream<Object, Country> input) {
return input
.groupBy((key, value) -> value.getCountryCode())
.aggregate(this::initialize,
this::aggregateAmount,
materializedAsPersistentStore("countries", Serdes.String(),
Serdes.serdeFrom(new JsonSerializer<>(),
new JsonDeserializer<>(AggregatedCountry.class))))
.toStream()
.map((key, value) -> new KeyValue<>(null, value));
}
@StreamListener("countries")
@SendTo("daily-statistic")
public KStream<?, List<DailyStatistics>> daily(KStream<Object, Country> input) {
return input
.groupBy((key, value) -> value.getCountryCode())
.aggregate(this::initializeDailyStatistics,
this::dailyStatistics,
materializedAsPersistentStore("daily", Serdes.String(),
Serdes.serdeFrom(new JsonSerializer<>(),
new JsonDeserializer<>(List.class))))
.toStream()
.map((key, value) -> new KeyValue<>(null, value));
}
但是当我启动 Spring 启动应用程序时,我得到了这个错误。
Exception in thread "kafka-stream-f4f8166b-cbeb-42ca-b461-2b3a23885a5d-StreamThread-1" java.lang.IllegalStateException: Consumer was assigned partitions [kafka-stream-daily-repartition-0] which didn't correspond to subscription request [kafka-stream-countries-repartition, countries]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.handleAssignmentMismatch(ConsumerCoordinator.java:218)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:264)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:424)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:353)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1251)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:963)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:863)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)
我想每个 StreamListener 方法都需要单独的应用程序 ID,但是如果我正在收听相同的主题,我该如何在 application.yml 文件中配置它?
您需要提供两个单独的输入绑定(并且这两个绑定可以指向同一主题)。您不能在多个 StreamListener
上使用相同的绑定名称。然后,您可以在输入绑定上为多个基于 StreamListener
的处理器设置 application.id
。例如
spring.cloud.stream.kafka.streams.bindings.countries1.consumer.applicationId
和
spring.cloud.stream.kafka.streams.bindings.countries2.consumer.applicationId
请参阅参考文档中的 this section。
您正在阅读“国家”主题两次,如果您从“国家”阅读一次,并将数据发送到“daily-statistic”和“aggregated-statistic”会更好。
读取两次和并发处理不是一回事。如果你想并发配置这个参数:
spring: cloud.stream: bindings: countries: destination: countries-topic consumer.concurrency: 6
您可以使用如下拓扑:
@StreamListener("countries") @SendTo({"daily-statistic", "aggregated-statistic"})