使用 KafkaProperties 配置多个 kafka 主题名称的最佳方法是什么

What is the best way to configure multiple kafka topic names using KafkaProperties

我们有一个应用程序将通过 kafka 与不同的服务进行通信。例如,我需要有 3 个消费者(很可能具有相同的 groupId)和 3 个生产者,每个消费者阅读和写不同的主题。我想利用 KafkaProperties class 来做到这一点。

听众

@Component
@RequiredArgsConstructor // lombok
public class MyKafkaListener {

  @NonNull
  private final EventProcessingService eventProcessingService;

  @NonNull
  private final KafkaProperties kafkaProperties;

  @KafkaListener(topics = #{__kafkaProperties.???})
  public void listenMessageA(final ConsumerRecord<String, MessageA> consumerRecord) {
      // Delegate to eventProcessingService
  }

  @KafkaListener(topics = #{__kafkaProperties.???})
  public void listenMessageB(final ConsumerRecord<String, MessageB> consumerRecord) {
      // Delegate to eventProcessingService
  }

  @KafkaListener(topics = #{__kafkaProperties.???})
  public void listenMessageC(final ConsumerRecord<String, MessageC> consumerRecord) {
      // Delegate to eventProcessingService
  }

}

出版商

@Component
@RequiredArgsConstructor // lombok
public class MyKafkaPublisher<T> {

  @NonNull
  private final KafkaTemplate<String, T> kafkaTemplate;

  @NonNull
  private final KafkaProperties kafkaProperties;

  public void sendMessageX(final MessageX messageX) {
      kafkaTemplate.send(kafkaProperties.???, messageX);
  }

  public void sendMessageY(final MessageY messageY) {
      kafkaTemplate.send(kafkaProperties.???, messageY);
  }

  public void sendMessageZ(final MessageZ messageZ) {
      kafkaTemplate.send(kafkaProperties.???, messageZ);
  }

}

来自 application.yml

的片段
spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      groupId: myGroupId
      properties:
        someConsumerProp: someValue
        # Should I add my consumer topic names here?
        topicForA: myFavTopicA
        topicForB: myFavTopicB
        topicForC: myFavTopicC
    producer:
      retries: 10
      properties:
        someProducerProp: someValue
        # Should I add my producer topic names here?
        topicForX: myFavTopicX
        topicForY: myFavTopicY
        topicForZ: myFavTopicZ
    properties:
      someCommonProp: someValue
      # Or may be all topic names here?
      listener.topicForA: myFavTopicA
      listener.topicForB: myFavTopicB
      listener.topicForC: myFavTopicC
      publisher.topicForX: myFavTopicX
      publisher.topicForY: myFavTopicY
      publisher.topicForZ: myFavTopicZ
    template:
      # Bonus question: What is the usage of this property?
      default-topic: myDefTopic

我想知道根据 spring-kafka 作者的建议替换上面 class 中的 ??? 的最佳方法是什么,这样我就不必写任何额外的 @ConfigurationProperties class 或在任何地方使用 @Value 但同时通过阅读 application.yml 本身来保持主题名称的概念清晰。或者作者是否提供了任何不同的方法来解决这种情况?

Boot 团队不将自动配置 Properties class 视为 public,并且可以随时更改,因此请小心在应用程序代码中使用它们。

像这样重载 kafka consumer/producer/admin 属性有点脏(Kafka 本身可能会抱怨其配置中的 "unknown" 属性)。

最好创建自己的 @ConfigurationProperties class.

要具体回答您的问题,请参阅 buildProducerProperties()