如何触发和处理这些 Spring Kafka 事件

How to trigger and handle these Spring Kafka events

在我的 Spring Boot 项目中,我有许多 Spring Kafka 消费者,我添加了一些事件侦听器来监视这些消费者的健康状况。这是代码:

@Component
public class ApplicationContextListeningService {

    @EventListener
    public void handleConsumerPausedEvent(ConsumerPausedEvent event) {
        LOGGER_ERROR.warn(WARNING_KAFKA_CONSUMERPAUSEDEVENT + event.getSource() + LOG_MSG_DELIMITER + event.toString());
    }

    @EventListener
    public void handleConsumerResumedEvent(ConsumerResumedEvent event) {
        LOGGER_ERROR.warn(WARNING_KAFKA_CONSUMERRESUMEDEVENT + event.getSource() + LOG_MSG_DELIMITER + event.toString());
    }

    @EventListener
    public void handleConsumerStoppedEvent(ConsumerStoppedEvent event) {
        LOGGER_ERROR.error(ERROR_KAFKA_CONSUMERSTOPPEDEVENT + event.getSource() + LOG_MSG_DELIMITER + event.toString());
    }

    @EventListener
    public void handleListenerContainerIdleEvent(ListenerContainerIdleEvent event) {
        LOGGER_ERROR.error(ERROR_KAFKA_LISTENERCONTAINERIDLEEVENT + event.getListenerId() + LOG_MSG_DELIMITER + event.toString());
    }

    @EventListener
    public void handleNonResponsiveConsumerEvent(NonResponsiveConsumerEvent event) {
        LOGGER_ERROR.error(ERROR_KAFKA_NONRESPONSIVECONSUMEREVENT + event.getListenerId() + LOG_MSG_DELIMITER + event.toString());
    }

}

有谁知道在什么情况下会抛出这些事件(也许我可以如何手动触发这些事件以进行测试)?还有最后三个事件(ConsumerStoppedEvent、ListenerContainerIdleEvent 和 NonResponsiveConsumerEvent),当我得到其中一个时,是否需要人为干预来解决问题(比如重新启动服务器以再次创建消费者)? 谢谢!

您可以通过将 Mock 消费者工厂注入容器来模拟它们。

  • ConsumerStoppedEvent 当您 stop() 容器时发出。
  • ListenerContainerIdleEvent只是表示在idleEventInterval中没有收到任何记录,所以通常并不意味着有问题。
  • NonResponsiveConsumerEvent - 很难说;对于较旧的客户端,如果服务器关闭,poll() 将阻塞,因此我们无法发出空闲事件(或做任何事情)。

我不知道最近的客户是否还能买到它们;但是要模拟它,您只需要在模拟消费者 poll() 方法中阻塞足够长的时间,以便监视器任务检测到问题并发出事件。