Kafka Consumer 无法间歇解析 Listener 方法

Kafka Consumer Unable To Resolve Listener Method Intermittently

我在 Kafka 消费者方面一直面临以下异常。令人惊讶的是,这个问题 不一致 并且旧版本的代码(具有完全相同的配置但一些新的不相关的功能)按预期工作。任何人都可以帮助确定可能导致此问题的原因吗?

[ERROR][938f3c68-f481-4224-b2c6-43af5fb27ada-0-C-1][org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer] - Error handler threw an exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.mycompany.listener.KafkaBatchListener.onMessage(java.lang.Object,org.springframework.kafka.support.Acknowledgment)]
Bean [com.mycompany.listener.KafkaBatchListener@7a59780b]; nested exception is org.springframework.messaging.handler.invocation.MethodArgumentResolutionException: Could not resolve method parameter at index 0 in public void com.mycompany.listener.KafkaBatchListener.onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>>,org.springframework.kafka.support.Acknowledgment): Could not resolve parameter [0] in public void com.mycompany.listener.KafkaBatchListener.onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>>,org.springframework.kafka.support.Acknowledgment): No suitable resolver, failedMessage=GenericMessage [payload=[[B@21bc784f, MyPOJO(), [B@33bb5851], headers={kafka_offset=[4046, 4047, 4048], kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@4871203f, kafka_timestampType=[CREATE_TIME, CREATE_TIME, CREATE_TIME], kafka_receivedPartitionId=[0, 0, 0], kafka_receivedMessageKey=[[B@295620f1, MyPOJOKey(id=0), [B@5d3d6361], kafka_batchConvertedHeaders=[{myFirstHeader=[B@1f011689, myUUIDHeader=[B@7691bce8, myMetadataHeader=[B@6e585b63, myRequestIdHeader=[B@58c81ba2, myMetricsHeader=[B@4f6aeb6c, myTargetHeader=[B@34677895}, {myUUIDHeader=[B@1848ae39, myMetadataHeader=[B@c5b399, myRequestIdHeader=[B@186c1966, myMetricsHeader=[B@1740692e, myTargetHeader=[B@4a242499}, {myUUIDHeader=[B@67d01f3f, myMetadataHeader=[B@1f0f9d8a, myRequestIdHeader=[B@b928e5c, isLastMessage=[B@6079735b, myMetricsHeader=[B@7b7b18c, myTargetHeader=[B@64378f3d}], kafka_receivedTopic=[my_topic, my_topic, my_topic], kafka_receivedTimestamp=[1623420136620, 1623420137255, 1623420137576], kafka_acknowledgment=Acknowledgment for org.apache.kafka.clients.consumer.ConsumerRecords@7bc81d89, kafka_groupId=dev-consumer-grp}]
    at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:77) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.ContainerAwareBatchErrorHandler.handle(ContainerAwareBatchErrorHandler.java:56) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchErrorHandler(KafkaMessageListenerContainer.java:2010) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:1854) [spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchListener(KafkaMessageListenerContainer.java:1720) [spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1699) [spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1272) [spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1264) [spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161) [spring-kafka-2.7.1.jar:2.7.1]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
    at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
    at java.lang.Thread.run(Thread.java:834) [?:?]
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.mycompany.listener.KafkaBatchListener.onMessage(java.lang.Object,org.springframework.kafka.support.Acknowledgment)]
Bean [com.mycompany.listener.KafkaBatchListener@7a59780b]; nested exception is org.springframework.messaging.handler.invocation.MethodArgumentResolutionException: Could not resolve method parameter at index 0 in public void com.mycompany.listener.KafkaBatchListener.onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>>,org.springframework.kafka.support.Acknowledgment): Could not resolve parameter [0] in public void com.mycompany.listener.KafkaBatchListener.onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>>,org.springframework.kafka.support.Acknowledgment): No suitable resolver, failedMessage=GenericMessage [payload=[[B@21bc784f, MyPOJO(), [B@33bb5851], headers={kafka_offset=[4046, 4047, 4048], kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@4871203f, kafka_timestampType=[CREATE_TIME, CREATE_TIME, CREATE_TIME], kafka_receivedPartitionId=[0, 0, 0], kafka_receivedMessageKey=[[B@295620f1, MyPOJOKey(id=0), [B@5d3d6361], kafka_batchConvertedHeaders=[{myFirstHeader=[B@1f011689, myUUIDHeader=[B@7691bce8, myMetadataHeader=[B@6e585b63, myRequestIdHeader=[B@58c81ba2, myMetricsHeader=[B@4f6aeb6c, myTargetHeader=[B@34677895}, {myUUIDHeader=[B@1848ae39, myMetadataHeader=[B@c5b399, myRequestIdHeader=[B@186c1966, myMetricsHeader=[B@1740692e, myTargetHeader=[B@4a242499}, {myUUIDHeader=[B@67d01f3f, myMetadataHeader=[B@1f0f9d8a, myRequestIdHeader=[B@b928e5c, isLastMessage=[B@6079735b, myMetricsHeader=[B@7b7b18c, myTargetHeader=[B@64378f3d}], kafka_receivedTopic=[my_topic, my_topic, my_topic], kafka_receivedTimestamp=[1623420136620, 1623420137255, 1623420137576], kafka_acknowledgment=Acknowledgment for org.apache.kafka.clients.consumer.ConsumerRecords@7bc81d89, kafka_groupId=dev-consumer-grp}]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2367) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2003) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessageWithRecordsOrList(KafkaMessageListenerContainer.java:1973) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessage(KafkaMessageListenerContainer.java:1925) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:1837) ~[spring-kafka-2.7.1.jar:2.7.1]
    ... 8 more
Caused by: org.springframework.messaging.handler.invocation.MethodArgumentResolutionException: Could not resolve method parameter at index 0 in public void com.mycompany.listener.KafkaBatchListener.onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>>,org.springframework.kafka.support.Acknowledgment): Could not resolve parameter [0] in public void com.mycompany.listener.KafkaBatchListener.onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>>,org.springframework.kafka.support.Acknowledgment): No suitable resolver
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:145) ~[spring-messaging-5.2.12.RELEASE.jar:5.2.12.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116) ~[spring-messaging-5.2.12.RELEASE.jar:5.2.12.RELEASE]
    at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:339) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.invoke(BatchMessagingMessageListenerAdapter.java:180) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:172) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:1983) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessageWithRecordsOrList(KafkaMessageListenerContainer.java:1973) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessage(KafkaMessageListenerContainer.java:1925) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:1837) ~[spring-kafka-2.7.1.jar:2.7.1]
    ... 8 more

我的应用程序使用以下内容:

  1. 自定义侦听器 class com.mycompany.listener.KafkaBatchListener<K, V> 实现 org.springframework.kafka.listener.BatchAcknowledgingMessageListener<K, V> 覆盖 onMessage(List<ConsumerRecord<K, V>> consumerRecords, Acknowledgment acknowledgment) 带有自定义标记注释 @MyKafkaListener
  2. 一个自定义容器工厂,它 扩展 org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory<K, V> 并配置 setConsumerFactory(consumerFactory)setBatchErrorHandler(errorHandler)setBatchListener(true)ContainerProperties.setOnlyLogRecordMetadata(true).
  3. A SpringBoot @Configuration class 其中 实现 org.springframework.kafka.annotation.KafkaListenerConfigurer 并负责配置 org.springframework.kafka.core.DefaultKafkaConsumerFactory<K, V>, org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory<K, V>org.springframework.kafka.config.MethodKafkaListenerEndpoint<String, String>(被 @MyKafkaListener 使用)
  4. Spring 卡夫卡 2.7.1

补充查询: 即使设置了 ContainerProperties.setOnlyLogRecordMetadata(true),异常堆栈跟踪仍然包含我省略的完整 payload。知道为什么吗?

提前致谢!


更新:

  1. KafkaBatchListener
package com.mycompany.listener;

import java.util.List;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.BatchAcknowledgingMessageListener;
import org.springframework.kafka.support.Acknowledgment;

public class KafkaBatchListener<K, V> implements BatchAcknowledgingMessageListener<K, V> {

    @Override
    @com.mycompany.listener.KafkaListener
    public void onMessage(final List<ConsumerRecord<K, V>> consumerRecords, final Acknowledgment acknowledgment) {

        // process batch using MyService<K, V>.process(consumerRecords)
        acknowledgment.acknowledge();
    }
}

  1. 自定义注释
package com.mycompany.listener;

import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;

@Retention(RUNTIME)
@Target(METHOD)
public @interface KafkaListener {

}
  1. 侦听器容器工厂
package com.mycompany.factory;

import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;

import com.mycompany.errorhandler.ListenerContainerRecoveringBatchErrorHandler;

public class KafkaBatchListenerContainerFactory<K, V>
        extends ConcurrentKafkaListenerContainerFactory<K, V> {

    public KafkaBatchListenerContainerFactory(final DefaultKafkaConsumerFactory<K, V> consumerFactory,
            final ListenerContainerRecoveringBatchErrorHandler errorHandler, final int concurrency) {

        super.setConsumerFactory(consumerFactory);
        super.setBatchErrorHandler(errorHandler);
        super.setConcurrency(concurrency);
        super.setBatchListener(true);
        super.setAutoStartup(true);

        final ContainerProperties containerProperties = super.getContainerProperties();
        containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL);
        containerProperties.setOnlyLogRecordMetadata(true);
    }

}
  1. 批处理错误处理程序
package com.mycompany.errorhandler;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.listener.RecoveringBatchErrorHandler;
import org.springframework.stereotype.Component;
import org.springframework.util.backoff.FixedBackOff;

@Component
public class ListenerContainerRecoveringBatchErrorHandler extends RecoveringBatchErrorHandler {

    public ListenerContainerRecoveringBatchErrorHandler(
            @Value("${spring.kafka.consumer.properties.backOffMS:0}") final int backOffTimeMS,
            @Value("${spring.kafka.consumer.properties.retries:3}") final int retries) {

        super(new FixedBackOff(backOffTimeMS, retries));
    }

}
  1. Kafka 侦听器配置器
package com.mycompany.config;

import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListenerConfigurer;
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;

import com.mycompany.errorhandler.ListenerContainerRecoveringBatchErrorHandler;
import com.mycompany.factory.KafkaBatchListenerContainerFactory;
import com.mycompany.listener.KafkaBatchListener;

@Configuration
public class KafkaBatchListenerConfigurer<K, V> implements KafkaListenerConfigurer {

    private final List<KafkaBatchListener<K, V>> listeners;
    private final BeanFactory beanFactory;
    private final ListenerContainerRecoveringBatchErrorHandler errorHandler;
    private final int concurrency;

    @Autowired
    public KafkaBatchListenerConfigurer(final List<KafkaBatchListener<K, V>> listeners, final BeanFactory beanFactory,
            final ListenerContainerRecoveringBatchErrorHandler errorHandler,
            @Value("${spring.kafka.listener.concurrency:1}") final int concurrency) {
        this.listeners = listeners;
        this.beanFactory = beanFactory;
        this.errorHandler = errorHandler;
        this.concurrency = concurrency;
    }

    @Override
    public void configureKafkaListeners(final KafkaListenerEndpointRegistrar registrar) {

        final Method listenerMethod = lookUpBatchListenerMethod();

        listeners.forEach(listener -> {
            registerListenerEndpoint(listener, listenerMethod, registrar);
        });
    }

    private void registerListenerEndpoint(final KafkaBatchListener<K, V> listener, final Method listenerMethod,
            final KafkaListenerEndpointRegistrar registrar) {

        // final Map<String, Object> consumerConfig = get ConsumerConfig from a custom provider;
        registrar.setContainerFactory(createContainerFactory(consumerConfig));
        registrar.registerEndpoint(createListenerEndpoint(listener, listenerMethod, consumerConfig));
    }

    private KafkaBatchListenerContainerFactory<K, V> createContainerFactory(final Map<String, Object> consumerConfig) {

        final DefaultKafkaConsumerFactory<K, V> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerConfig);

        final KafkaBatchListenerContainerFactory<K, V> containerFactory = new KafkaBatchListenerContainerFactory<>(
                consumerFactory, errorHandler, concurrency);

        return containerFactory;
    }

    private MethodKafkaListenerEndpoint<String, String> createListenerEndpoint(final KafkaBatchListener<K, V> listener,
            final Method listenerMethod, final Map<String, Object> consumerConfig) {

        final MethodKafkaListenerEndpoint<String, String> endpoint = new MethodKafkaListenerEndpoint<>();
        endpoint.setId(UUID.randomUUID().toString());
        endpoint.setBean(listener);
        endpoint.setMethod(listenerMethod);
        endpoint.setBeanFactory(beanFactory);
        endpoint.setGroupId("my-group-id");
        endpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());

        // final String topicName = get TopicName for this key-value from a custom utility;
        endpoint.setTopics(topicName);

        final Properties properties = new Properties();
        properties.putAll(consumerConfig);
        endpoint.setConsumerProperties(properties);

        return endpoint;
    }

    private Method lookUpBatchListenerMethod() {
        return Arrays.stream(com.mycompany.listener.KafkaBatchListener.class.getMethods())
                .filter(m -> m.isAnnotationPresent(com.mycompany.listener.KafkaListener.class))
                .findAny()
                .orElseThrow(() -> new IllegalStateException(
                        String.format("[%s] class should have at least 1 method with [%s] annotation.",
                                com.mycompany.listener.KafkaBatchListener.class.getCanonicalName(),
                                com.mycompany.listener.KafkaListener.class.getCanonicalName())));
    }

}

当您的侦听器已经实现了消息侦听器接口之一时,您不需要所有标准 @KafkaListener 方法调用基础结构;无需为每个侦听器注册端点,只需为工厂中的每个侦听器创建一个容器并将侦听器添加到容器属性即可。

val container = containerFactory.createContainer("topic1");
container.getContainerProperties().set...
...
container.getContainerProperies().setMessageListener(myListenerInstance);
...
container.start();