使用 Kafka 连接动态地 Subscribe/Unsubscribe 使用 Spring Boot 的 Kafka 主题

Use Kakfa connection to dynamically Subscribe/Unsubscribe the Kafka Topics using Spring Boot

我正在开发一个 SpringBoot 应用程序,它将 API 暴露给 sub/unsub Kafka 主题。我们需要做的就是在 API 调用中传递 topic-name,应用程序将订阅它并使用消息。

订阅主题API:

{FQDN}/sub/{topic-name}

退订主题 API :

{FQDN}/unsub/{topic-name}

依赖关系

         <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.3.1.RELEASE</version>
        </dependency>

我创建了KafkaConsumerConfiguration,其中声明了一些豆子(如下)。

@EnableKafka
@Configuration
public class KafkaConsumerConfiguration {
    private static final String KAFKA_BOOTSTRAP_SERVER = "localhost:9092";

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaConsumerFactory());
        return factory;
    }

    @Bean
    public ConsumerFactory<String, String> kafkaConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerProps());
    }

    @Bean
    public Map<String, Object> consumerProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BOOTSTRAP_SERVER);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return props;
    }
}

并且我使用了 ConcurrentMessageListenerContainer.start() 方法来针对 topic-id.

调用订阅 API
@Service
public class KafkaConsumerService {

    // This map will be used to store the running consumers info. 
    // So that when we need to stop/unsub the topic, then we can get the container from this map
    private static Map<String, ConcurrentMessageListenerContainer<String, String>> consumersMap = new HashMap<>();

    @Autowired
    private ConsumerFactory<String, String> kafkaConsumerFactory;

    public void createConsumers(String topic,MessageListener messageListener) {
        log.info("creating kafka consumer for topic {}", topic);

        ContainerProperties containerProps = new ContainerProperties(topic);
        containerProps.setPollTimeout(100);

        ConcurrentMessageListenerContainer<String, String> container =new ConcurrentMessageListenerContainer<>(kafkaConsumerFactory, containerProps);
        container.setupMessageListener(messageListener);

        container.start();

        consumersMap.put(topic, container);

        log.info("created and started kafka consumer for topic {}", topic);
    }


   public void stopConsumer(String topic) {
        log.info("stopping consumer for topic {}", topic);
        ConcurrentMessageListenerContainer<String, String> container = consumersMap.get(topic);
        if (container != null) {
            container.stop();
            log.info("consumer stopped!! Unsubscribed all topics or patterns and assigned partitions");
        }
    }

}

这个解决方案工作得很好。喜欢,

但问题是, 每次当我调用 API 订阅主题时,它都会创建 AdminClient.create(props)(下图第 336 行)

这会生成如下 logs

2022-04-26 01:31:11.676  INFO 26021 --- [nio-8888-exec-2] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values: 
    bootstrap.servers = [localhost:9092]
    client.dns.lookup = default
    client.id = 
    connections.max.idle.ms = 300000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 120000
    retries = 5
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS

2022-04-26 01:31:12.043  INFO 26021 --- [nio-8888-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.0
2022-04-26 01:31:12.043  INFO 26021 --- [nio-8888-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: fc1aaa116b661c8a
2022-04-26 01:31:12.043  INFO 26021 --- [nio-8888-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1650918672031

我不希望 AdminClient 以这种方式每次 time.Because 创建此实例,创建主题订阅大约需要 2 秒。这在我的用例中是不可接受的。

Required Solution:

kafka 连接只会创建一次。然后我可以使用那个连接 sub/unub kafka 主题。这样每次它都不会创建此连接并且时间效率也会提高。

谢谢。

2.3.x很久没支持了

https://spring.io/projects/spring-kafka#support

最后一个 2.3.x 版本是去年 7 月的 2.3.14。

admin用于检查topic是否存在;使用旧版本,由 ContainerProperties 中的 missingTopicsFatal 属性 控制;那个版本是真的。

对于现代版本(自 2.3.4 起),它是 false,因此当您启动容器时不会创建 AdminClient。

但您确实需要升级到受支持的版本(建议使用 2.8.5 - 2.7.x 即将停止对 OSS 的支持)。

不支持重用不同(主题)的消费者;每次启动容器时都会创建一个新的消费者。