如何为每个 StreamListener 的 ConcurrentKafkaListenerContainerFactory 设置并发(或其他配置)

how to set concurrency (or other configurations) for ConcurrentKafkaListenerContainerFactory per StreamListener

我们的应用程序(spring 启动,spring-基于云流)侦听多个 Kafka 主题(TOPIC_A 有 3 个分区,TOPIC_B有 1 个分区,TOPIC_C 有 10 个分区)即 3 个 @StreamListener 方法。

   @StreamListener(TopicASink.INPUT)
    public void processTopicA(Message<String> msg) {
        logger.info("** recieved message: {} ", msg.getPayload());
      // do some processing
    }

   @StreamListener(TopicBSink.INPUT)
    public void processTopicB(Message<String> msg) {
        logger.info("** recieved message: {} ", msg.getPayload());
       // do some processing
    }

   @StreamListener(TopicCSink.INPUT)
    public void processTopicC(Message<String> msg) {
        logger.info("** recieved message: {} ", msg.getPayload());
      // do some processing
    }

我们需要自定义错误处理和重试机制,因此通过配置 ConcurrentKafkaListenerContainerFactory bean 来实现。

       @Bean
        public ConcurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory(ConsumerFactory<Object,Object> consumerFactory) {
            ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConcurrency(2);   // we need to customize this per topic based on number of partitions
            factory.setConsumerFactory(consumerFactory);
            RetryTemplate retryTemplate = new RetryTemplate();
            retryTemplate.setRetryPolicy(new SimpleRetryPolicy(10));
            factory.setRetryTemplate(retryTemplate);
            factory.setErrorHandler(new SeekToCurrentErrorHandler(new FixedBackOff(FixedBackOff.DEFAULT_INTERVAL, 10)));
            return factory;
        }

问题是现在我们需要 KafkaListenerContainer 的一些属性根据@StreamListener(即在这种情况下的每个主题)而变化,比如 TOPIC_A 的并发数为 3,TOPIC_C 的并发数为 10等等,而不是在工厂设置常见的并发性或为 TOPIC_A、TOPIC_C 设置 SeekToCurrentErrorHandler 而不是为 TOPIC_B (或某些主题的不同 ErrorHandler)设置 SeekToCurrentErrorHandler。

每个容器级别如何实现?


使用下面共享的反射解决方案尝试解决方案后的堆栈跟踪

o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException: Exception thrown while invoking com.jta.poc.kafkapoc.KafkaStreamPocApplication$MessageProcessor#processInput[1 args]; nested exception is com.jta.poc.kafkapoc.MyNewRetryableException, failedMessage=GenericMessage [payload=byte[35], headers={kafka_timestampType=CREATE_TIME, kafka_receivedTopic=new_input_topic, spanTraceId=e3382bf49eaa5343, spanId=e3382bf49eaa5343, nativeHeaders={spanTraceId=[e3382bf49eaa5343], spanId=[efc90644fc4c7dee], spanSampled=[0], X-B3-TraceId=[e3382bf49eaa5343], X-B3-SpanId=[efc90644fc4c7dee], X-B3-ParentSpanId=[e3382bf49eaa5343], spanParentSpanId=[e3382bf49eaa5343], X-B3-Sampled=[0]}, kafka_offset=26, X-B3-SpanId=e3382bf49eaa5343, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2a011bf8, X-B3-Sampled=0, X-B3-TraceId=e3382bf49eaa5343, id=3c86f652-f16e-2f59-1a59-f3d8601849f0, kafka_receivedPartitionId=1, spanSampled=0, kafka_receivedTimestamp=1586250896206, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = new_input_topic, partition = 1, offset = 26, CreateTime = 1586250896206, serialized key size = -1, serialized value size = 35, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = [B@68df9f80), contentType=application/json, timestamp=1586274368357}]
    at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:63)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access0(KafkaMessageDrivenChannelAdapter.java:70)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:387)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:364)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage[=12=](RetryingMessageListenerAdapter.java:120)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1071)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1051)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:998)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:866)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:724)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.jta.poc.kafkapoc.MyNewRetryableException
    at com.jta.poc.kafkapoc.KafkaStreamPocApplication$MessageProcessor.consumeMessage(KafkaStreamPocApplication.java:164)
    at com.jta.poc.kafkapoc.KafkaStreamPocApplication$MessageProcessor.lambda$processInput[=12=](KafkaStreamPocApplication.java:107)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:164)
    at com.jta.poc.kafkapoc.KafkaStreamPocApplication$MessageProcessor.processInput(KafkaStreamPocApplication.java:105)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:181)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:114)
    at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)
    ... 29 more

此上下文中未使用容器工厂。

添加一个ListenerContainerCustomizer@Bean.

@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> cust() {
    return (container, destination, group) -> { ... };
}

如您所见,您获得了对容器、目标名称和组的引用,因此您可以确定调用的是哪个绑定。

/**
 * If a single bean of this type is in the application context, listener containers
 * created by the binder can be further customized after all the properties are set. For
 * example, to configure less-common properties.
 *
 * @param <T> container type
 * @author Gary Russell
 * @author Oleg Zhurakousky
 * @since 2.1
 */
@FunctionalInterface
public interface ListenerContainerCustomizer<T> {

    /**
     * Configure the container that is being created for the supplied queue name and
     * consumer group.
     * @param container the container.
     * @param destinationName the destination name.
     * @param group the consumer group.
     */
    void configure(T container, String destinationName, String group);

}

在容器上设置错误处理程序等。

编辑

这是 2.0.x 的技巧,如果您不介意使用反射;但请记住,当时 STCEH 中不支持 BackOff

此外,Boot 2.0 已停产,自去年 4 月以来就不再受支持;所以你真的应该升级。

    @Bean
    public SmartLifecycle bindingFixer(BindingService bindingService) {
        return new SmartLifecycle() {

            @Override
            public int getPhase() {
                return Integer.MAX_VALUE;
            }

            @Override
            public void stop() {
                // no op
            }

            @Override
            public void start() {
                @SuppressWarnings("unchecked")
                Map<String, Binding> consumers = (Map<String, Binding>) new DirectFieldAccessor(bindingService)
                        .getPropertyValue("consumerBindings");
                SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler();
                ((ConcurrentMessageListenerContainer<?, ?>) new DirectFieldAccessor(consumers.get("input"))
                        .getPropertyValue("lifecycle.messageListenerContainer")).getContainerProperties()
                            .setErrorHandler(errorHandler);
            }

            @Override
            public boolean isRunning() {
                return false;
            }

            @Override
            public void stop(Runnable callback) {
                callback.run();
            }

            @Override
            public boolean isAutoStartup() {
                return true;
            }
        };