Spring 启动 Kafka 侦听器不一致

Spring Boot Kafka listener is inconsistent

我正在尝试让几个不同的 Spring 云微服务都连接到一个 Kafka/Zookeeper 集群,都在 Kubernetes 中。微服务正在使用 org.springframework.kafka:spring-kafka - 作为事件的消费者和生产者。

所有服务都可以连接到 kafka - 并且主题已创建;然而每个服务的消费者非常不一致。

比如服务启动一次,所有的消费者都会监听消息,调用函数。但是,当我重新启动所有内容(包括 kafka 和 zookeeper)时,它要么无法正常工作,要么不同服务中的某些消费者可以正常工作等...

这是我的一些配置 - 我没有任何基于 Java 的配置 - 只是在我的 application.yml 中,如下所示:

spring:

  ....

  kafka:
    consumer:
      bootstrap-servers: api-kafka.default.svc.cluster.local:9092
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: api-event
      enable-auto-commit: false

    producer:
      bootstrap-servers: api-kafka.default.svc.cluster.local:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      ack-mode: manual

...

我的主要 class:

@EnableCaching
@SpringBootApplication
@EnableJpaRepositories
@EnableDiscoveryClient
@EnableKafka /* <<<<<<<------------- ENABLED HERE */
public class ExampleServiceApplication {

  public static void main(String[] args) {
    SpringApplication.run(ExampleServiceApplication.class, args);
  }

  .....
}

最后,我的消费者:

@Component
public class MessageListener {

  @KafkaListener(
      topics = "myTopic")
  public void eventListener(String serializedMessage) {
    try {
....

消息被很好地发送到代理,但其他服务没有使用。

我发现没有映射到每个服务属性上的主题,我该如何通过 application.yml 做到这一点?

我敢打赌我犯了一个非常愚蠢的错误但是是的!如果有任何意见或帮助,我将不胜感激

顺便说一句,你可以在这里阅读更多关于分区数量和并行消费者(具有相同组id的消费者)数量之间的关系。

https://docs.confluent.io/platform/current/streams/architecture.html

Slightly simplified, the maximum parallelism at which your application may run is bounded by the maximum number of stream tasks, which itself is determined by maximum number of partitions of the input topic(s) the application is reading from. For example, if your input topic has 5 partitions, then you can run up to 5 applications instances. These instances will collaboratively process the topic’s data. If you run a larger number of app instances than partitions of the input topic, the “excess” app instances will launch but remain idle; however, if one of the busy instances goes down, one of the idle instances will resume the former’s work. We provide a more detailed explanation and example in the FAQ.