Micronaut 3+ 和 Kafka 集成

Micronaut 3+ and Kafka integration

我目前正在尝试熟悉 micronaut 和 Kafka。我已经按照文档在微服务中创建了一个 KafkaClient,在另一个微服务中创建了一个 KafkaListener。代码如下所示:

客户。这是按预期工作的。我正在兽医评论主题中发布新消息:

@KafkaClient
public interface VetReviewClient {

    @Topic("vet-reviews")
    void send(VetReviewApi vetReviewApi);

}

听众:

@KafkaListener(groupId = "pet-clinic", offsetReset = OffsetReset.EARLIEST)
@AllArgsConstructor
@Slf4j
public class VetReviewListener {

    private VetService vetService;

    @Topic("vet-reviews")
    public void receive(@MessageBody VetReviewApi vetReviewApi) {
        log.info("Received vet review: {}", vetReviewApi);

        vetService.processVetReview(vetReviewApi);
    }

}

虽然在主题中发布了消息,但监听器并没有拉取新消息。我关注了官方micronaut-kafka integration documentation,但是没有用

我知道这个问题是不久前发布的,但我没有看到任何答案,而且我无法通过评论“恢复”这个问题,因为我没有足够的声誉。

Link to old post

更新问题。我已经更改了 Kafka docker 图像,仅此而已。 当前工作 kafka-zookeper-kafdrop 配置:

 zookeeper:
    image: confluentinc/cp-zookeeper:7.0.1
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.0.1
    container_name: kafka
    ports:
      # To learn about configuring Kafka for access across networks see
      # https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
      - "9092:9092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

  kafdrop:
    image: obsidiandynamics/kafdrop
    ports:
      - 9100:9000
    environment:
      - KAFKA_BROKERCONNECT=kafka:29092
      - JVM_OPTS=-Xms32M -Xmx64M
    depends_on:
      - kafka