使用 KafkaProperties 配置多个 kafka 主题名称的最佳方法是什么
What is the best way to configure multiple kafka topic names using KafkaProperties
我们有一个应用程序将通过 kafka 与不同的服务进行通信。例如,我需要有 3 个消费者(很可能具有相同的 groupId
)和 3 个生产者,每个消费者阅读和写不同的主题。我想利用 KafkaProperties
class 来做到这一点。
听众
@Component
@RequiredArgsConstructor // lombok
public class MyKafkaListener {
@NonNull
private final EventProcessingService eventProcessingService;
@NonNull
private final KafkaProperties kafkaProperties;
@KafkaListener(topics = #{__kafkaProperties.???})
public void listenMessageA(final ConsumerRecord<String, MessageA> consumerRecord) {
// Delegate to eventProcessingService
}
@KafkaListener(topics = #{__kafkaProperties.???})
public void listenMessageB(final ConsumerRecord<String, MessageB> consumerRecord) {
// Delegate to eventProcessingService
}
@KafkaListener(topics = #{__kafkaProperties.???})
public void listenMessageC(final ConsumerRecord<String, MessageC> consumerRecord) {
// Delegate to eventProcessingService
}
}
出版商
@Component
@RequiredArgsConstructor // lombok
public class MyKafkaPublisher<T> {
@NonNull
private final KafkaTemplate<String, T> kafkaTemplate;
@NonNull
private final KafkaProperties kafkaProperties;
public void sendMessageX(final MessageX messageX) {
kafkaTemplate.send(kafkaProperties.???, messageX);
}
public void sendMessageY(final MessageY messageY) {
kafkaTemplate.send(kafkaProperties.???, messageY);
}
public void sendMessageZ(final MessageZ messageZ) {
kafkaTemplate.send(kafkaProperties.???, messageZ);
}
}
来自 application.yml
的片段
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
groupId: myGroupId
properties:
someConsumerProp: someValue
# Should I add my consumer topic names here?
topicForA: myFavTopicA
topicForB: myFavTopicB
topicForC: myFavTopicC
producer:
retries: 10
properties:
someProducerProp: someValue
# Should I add my producer topic names here?
topicForX: myFavTopicX
topicForY: myFavTopicY
topicForZ: myFavTopicZ
properties:
someCommonProp: someValue
# Or may be all topic names here?
listener.topicForA: myFavTopicA
listener.topicForB: myFavTopicB
listener.topicForC: myFavTopicC
publisher.topicForX: myFavTopicX
publisher.topicForY: myFavTopicY
publisher.topicForZ: myFavTopicZ
template:
# Bonus question: What is the usage of this property?
default-topic: myDefTopic
我想知道根据 spring-kafka
作者的建议替换上面 class 中的 ???
的最佳方法是什么,这样我就不必写任何额外的 @ConfigurationProperties
class 或在任何地方使用 @Value
但同时通过阅读 application.yml
本身来保持主题名称的概念清晰。或者作者是否提供了任何不同的方法来解决这种情况?
Boot 团队不将自动配置 Properties
class 视为 public,并且可以随时更改,因此请小心在应用程序代码中使用它们。
像这样重载 kafka consumer/producer/admin 属性有点脏(Kafka 本身可能会抱怨其配置中的 "unknown" 属性)。
最好创建自己的 @ConfigurationProperties
class.
要具体回答您的问题,请参阅 buildProducerProperties()
。
我们有一个应用程序将通过 kafka 与不同的服务进行通信。例如,我需要有 3 个消费者(很可能具有相同的 groupId
)和 3 个生产者,每个消费者阅读和写不同的主题。我想利用 KafkaProperties
class 来做到这一点。
听众
@Component
@RequiredArgsConstructor // lombok
public class MyKafkaListener {
@NonNull
private final EventProcessingService eventProcessingService;
@NonNull
private final KafkaProperties kafkaProperties;
@KafkaListener(topics = #{__kafkaProperties.???})
public void listenMessageA(final ConsumerRecord<String, MessageA> consumerRecord) {
// Delegate to eventProcessingService
}
@KafkaListener(topics = #{__kafkaProperties.???})
public void listenMessageB(final ConsumerRecord<String, MessageB> consumerRecord) {
// Delegate to eventProcessingService
}
@KafkaListener(topics = #{__kafkaProperties.???})
public void listenMessageC(final ConsumerRecord<String, MessageC> consumerRecord) {
// Delegate to eventProcessingService
}
}
出版商
@Component
@RequiredArgsConstructor // lombok
public class MyKafkaPublisher<T> {
@NonNull
private final KafkaTemplate<String, T> kafkaTemplate;
@NonNull
private final KafkaProperties kafkaProperties;
public void sendMessageX(final MessageX messageX) {
kafkaTemplate.send(kafkaProperties.???, messageX);
}
public void sendMessageY(final MessageY messageY) {
kafkaTemplate.send(kafkaProperties.???, messageY);
}
public void sendMessageZ(final MessageZ messageZ) {
kafkaTemplate.send(kafkaProperties.???, messageZ);
}
}
来自 application.yml
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
groupId: myGroupId
properties:
someConsumerProp: someValue
# Should I add my consumer topic names here?
topicForA: myFavTopicA
topicForB: myFavTopicB
topicForC: myFavTopicC
producer:
retries: 10
properties:
someProducerProp: someValue
# Should I add my producer topic names here?
topicForX: myFavTopicX
topicForY: myFavTopicY
topicForZ: myFavTopicZ
properties:
someCommonProp: someValue
# Or may be all topic names here?
listener.topicForA: myFavTopicA
listener.topicForB: myFavTopicB
listener.topicForC: myFavTopicC
publisher.topicForX: myFavTopicX
publisher.topicForY: myFavTopicY
publisher.topicForZ: myFavTopicZ
template:
# Bonus question: What is the usage of this property?
default-topic: myDefTopic
我想知道根据 spring-kafka
作者的建议替换上面 class 中的 ???
的最佳方法是什么,这样我就不必写任何额外的 @ConfigurationProperties
class 或在任何地方使用 @Value
但同时通过阅读 application.yml
本身来保持主题名称的概念清晰。或者作者是否提供了任何不同的方法来解决这种情况?
Boot 团队不将自动配置 Properties
class 视为 public,并且可以随时更改,因此请小心在应用程序代码中使用它们。
像这样重载 kafka consumer/producer/admin 属性有点脏(Kafka 本身可能会抱怨其配置中的 "unknown" 属性)。
最好创建自己的 @ConfigurationProperties
class.
要具体回答您的问题,请参阅 buildProducerProperties()
。