使用 Kafka 连接动态地 Subscribe/Unsubscribe 使用 Spring Boot 的 Kafka 主题
Use Kakfa connection to dynamically Subscribe/Unsubscribe the Kafka Topics using Spring Boot
我正在开发一个 SpringBoot 应用程序,它将 API 暴露给 sub/unsub Kafka 主题。我们需要做的就是在 API 调用中传递 topic-name
,应用程序将订阅它并使用消息。
订阅主题API:
{FQDN}/sub/{topic-name}
退订主题 API :
{FQDN}/unsub/{topic-name}
依赖关系
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.3.1.RELEASE</version>
</dependency>
我创建了KafkaConsumerConfiguration
,其中声明了一些豆子(如下)。
@EnableKafka
@Configuration
public class KafkaConsumerConfiguration {
private static final String KAFKA_BOOTSTRAP_SERVER = "localhost:9092";
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumerFactory());
return factory;
}
@Bean
public ConsumerFactory<String, String> kafkaConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerProps());
}
@Bean
public Map<String, Object> consumerProps() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BOOTSTRAP_SERVER);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
}
并且我使用了 ConcurrentMessageListenerContainer.start()
方法来针对 topic-id
.
调用订阅 API
@Service
public class KafkaConsumerService {
// This map will be used to store the running consumers info.
// So that when we need to stop/unsub the topic, then we can get the container from this map
private static Map<String, ConcurrentMessageListenerContainer<String, String>> consumersMap = new HashMap<>();
@Autowired
private ConsumerFactory<String, String> kafkaConsumerFactory;
public void createConsumers(String topic,MessageListener messageListener) {
log.info("creating kafka consumer for topic {}", topic);
ContainerProperties containerProps = new ContainerProperties(topic);
containerProps.setPollTimeout(100);
ConcurrentMessageListenerContainer<String, String> container =new ConcurrentMessageListenerContainer<>(kafkaConsumerFactory, containerProps);
container.setupMessageListener(messageListener);
container.start();
consumersMap.put(topic, container);
log.info("created and started kafka consumer for topic {}", topic);
}
public void stopConsumer(String topic) {
log.info("stopping consumer for topic {}", topic);
ConcurrentMessageListenerContainer<String, String> container = consumersMap.get(topic);
if (container != null) {
container.stop();
log.info("consumer stopped!! Unsubscribed all topics or patterns and assigned partitions");
}
}
}
这个解决方案工作得很好。喜欢,
- 这是 sub/unsub使用 APIs
编写 kafka 主题
- 也能正常接收消息。
但问题是,
每次当我调用 API 订阅主题时,它都会创建 AdminClient.create(props)
(下图第 336 行)
这会生成如下 logs,
2022-04-26 01:31:11.676 INFO 26021 --- [nio-8888-exec-2] o.a.k.clients.admin.AdminClientConfig : AdminClientConfig values:
bootstrap.servers = [localhost:9092]
client.dns.lookup = default
client.id =
connections.max.idle.ms = 300000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 120000
retries = 5
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
2022-04-26 01:31:12.043 INFO 26021 --- [nio-8888-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.0
2022-04-26 01:31:12.043 INFO 26021 --- [nio-8888-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: fc1aaa116b661c8a
2022-04-26 01:31:12.043 INFO 26021 --- [nio-8888-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1650918672031
我不希望 AdminClient 以这种方式每次 time.Because 创建此实例,创建主题订阅大约需要 2 秒。这在我的用例中是不可接受的。
Required Solution:
kafka 连接只会创建一次。然后我可以使用那个连接 sub/unub kafka 主题。这样每次它都不会创建此连接并且时间效率也会提高。
谢谢。
2.3.x很久没支持了
https://spring.io/projects/spring-kafka#support
最后一个 2.3.x 版本是去年 7 月的 2.3.14。
admin用于检查topic是否存在;使用旧版本,由 ContainerProperties
中的 missingTopicsFatal
属性 控制;那个版本是真的。
对于现代版本(自 2.3.4 起),它是 false,因此当您启动容器时不会创建 AdminClient。
但您确实需要升级到受支持的版本(建议使用 2.8.5 - 2.7.x 即将停止对 OSS 的支持)。
不支持重用不同(主题)的消费者;每次启动容器时都会创建一个新的消费者。
我正在开发一个 SpringBoot 应用程序,它将 API 暴露给 sub/unsub Kafka 主题。我们需要做的就是在 API 调用中传递 topic-name
,应用程序将订阅它并使用消息。
订阅主题API:
{FQDN}/sub/{topic-name}
退订主题 API :
{FQDN}/unsub/{topic-name}
依赖关系
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.3.1.RELEASE</version>
</dependency>
我创建了KafkaConsumerConfiguration
,其中声明了一些豆子(如下)。
@EnableKafka
@Configuration
public class KafkaConsumerConfiguration {
private static final String KAFKA_BOOTSTRAP_SERVER = "localhost:9092";
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumerFactory());
return factory;
}
@Bean
public ConsumerFactory<String, String> kafkaConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerProps());
}
@Bean
public Map<String, Object> consumerProps() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BOOTSTRAP_SERVER);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
}
并且我使用了 ConcurrentMessageListenerContainer.start()
方法来针对 topic-id
.
@Service
public class KafkaConsumerService {
// This map will be used to store the running consumers info.
// So that when we need to stop/unsub the topic, then we can get the container from this map
private static Map<String, ConcurrentMessageListenerContainer<String, String>> consumersMap = new HashMap<>();
@Autowired
private ConsumerFactory<String, String> kafkaConsumerFactory;
public void createConsumers(String topic,MessageListener messageListener) {
log.info("creating kafka consumer for topic {}", topic);
ContainerProperties containerProps = new ContainerProperties(topic);
containerProps.setPollTimeout(100);
ConcurrentMessageListenerContainer<String, String> container =new ConcurrentMessageListenerContainer<>(kafkaConsumerFactory, containerProps);
container.setupMessageListener(messageListener);
container.start();
consumersMap.put(topic, container);
log.info("created and started kafka consumer for topic {}", topic);
}
public void stopConsumer(String topic) {
log.info("stopping consumer for topic {}", topic);
ConcurrentMessageListenerContainer<String, String> container = consumersMap.get(topic);
if (container != null) {
container.stop();
log.info("consumer stopped!! Unsubscribed all topics or patterns and assigned partitions");
}
}
}
这个解决方案工作得很好。喜欢,
- 这是 sub/unsub使用 APIs 编写 kafka 主题
- 也能正常接收消息。
但问题是,
每次当我调用 API 订阅主题时,它都会创建 AdminClient.create(props)
(下图第 336 行)
这会生成如下 logs,
2022-04-26 01:31:11.676 INFO 26021 --- [nio-8888-exec-2] o.a.k.clients.admin.AdminClientConfig : AdminClientConfig values:
bootstrap.servers = [localhost:9092]
client.dns.lookup = default
client.id =
connections.max.idle.ms = 300000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 120000
retries = 5
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
2022-04-26 01:31:12.043 INFO 26021 --- [nio-8888-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.0
2022-04-26 01:31:12.043 INFO 26021 --- [nio-8888-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: fc1aaa116b661c8a
2022-04-26 01:31:12.043 INFO 26021 --- [nio-8888-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1650918672031
我不希望 AdminClient 以这种方式每次 time.Because 创建此实例,创建主题订阅大约需要 2 秒。这在我的用例中是不可接受的。
Required Solution:
kafka 连接只会创建一次。然后我可以使用那个连接 sub/unub kafka 主题。这样每次它都不会创建此连接并且时间效率也会提高。
谢谢。
2.3.x很久没支持了
https://spring.io/projects/spring-kafka#support
最后一个 2.3.x 版本是去年 7 月的 2.3.14。
admin用于检查topic是否存在;使用旧版本,由 ContainerProperties
中的 missingTopicsFatal
属性 控制;那个版本是真的。
对于现代版本(自 2.3.4 起),它是 false,因此当您启动容器时不会创建 AdminClient。
但您确实需要升级到受支持的版本(建议使用 2.8.5 - 2.7.x 即将停止对 OSS 的支持)。
不支持重用不同(主题)的消费者;每次启动容器时都会创建一个新的消费者。