Kafka Listener Concurreny - 如何处理从 6 个线程触发的 Stop/Idle 事件

Kafka Listener Concurreny - How to handle Stop/Idle events fired from 6 threads

我有 2 个 Kafka 监听器 Daily/Weekly。每天都有 autoStartup = true 和 Weekly 有 autoStartup = false。我有一个端点可以停止每日 运行 并开始每周。一旦 Weekly 完成消费消息,我等待空闲事件(设置为 1 分钟)触发,然后我停止 Weekly。现在我正在听每周开始的 STOP 事件。现在的问题是我将并发设置为 6。所以我得到 6 个空闲事件和 6 个停止事件。我已经处理如下。我想知道这是一个好习惯还是有更好的方法?

我将所有每日停止事件收集在一个 ConcurrentHashMap 中。一旦达到并发计数,这意味着所有 6 个每日侦听器线程都将停止并可以启动每周侦听器。

private void processDailyStopEvent(ConsumerStoppedEvent event)
    {
        LOGGER.info("Processing DAILY Stop events");
        KafkaMessageListenerContainer source = (KafkaMessageListenerContainer) event.getSource();
        ConcurrentMessageListenerContainer container = (ConcurrentMessageListenerContainer) event.getContainer(ConcurrentMessageListenerContainer.class);

        eventMap.get(“dailyStop”).add(source.getListenerId());
        LOGGER.info("Added ListenerId {} to Map<dailyStop>", source.getListenerId());

        if (eventMap.get(“dailyStop”).size() == container.getConcurrency()) {
            LOGGER.info("All DAILY Stop events are captured. Clearing the Map<dailyStop>");
            eventMap.get(“dailyStop”).clear();

            LOGGER.info("Starting WEEKLY Consumer now.");
            kafkaService.startWeeklyConsumer();
        }
    }


你有一个合理的方法。

或者,您可以等待并发容器的停止事件(源等于容器)- 在所有子容器停止后发布。