为什么Spring Integration QueueChannel顺序运行kafka延迟投递消息

Why Spring Integration QueueChannel runs sequentially with the delayed delivery message in kafka

使用 Kafka 集成并配置 QueueChannel 时,

queue channel接收到消息后的处理是顺序执行的,延迟一秒,无法理解原因,queue channel应该是消息的堆积(达到配置的限制)和只要队列不为空并且有消费者,就从队列中释放消息。 为什么消息会延迟一秒按顺序发布?

按照日志,可以看出,消息是立即接收的(根据日志的日期)并按顺序处理,延迟 1 秒?
2020-04-06 13:08:28.108 INFO 30718 --- [ntainer#0-0-C-1] o.s.integration.handler.LoggingHandler : readKafkaChannel: item: 2 - enriched
2020-04-06 13:08:28.109 INFO 30718 --- [ask-scheduler-3] o.s.integration.handler.LoggingHandler:channelThatIsProcessingSequential - 项目:2 - 丰富
2020-04-06 13:08:28.110 INFO 30718 --- [ntainer#0-0-C-1] o.s.integration.handler.LoggingHandler : readKafkaChannel: item: 7 - 丰富
2020-04-06 13:08:28.111 INFO 30718 --- [ntainer#0-0-C-1] o.s.integration.handler.LoggingHandler : readKafkaChannel: item: 5 - 丰富
2020-04-06 13:08:28.116 INFO 30718 --- [ntainer#0-1-C-1] o.s.integration.handler.LoggingHandler : readKafkaChannel: item: 6 - 丰富
2020-04-06 13:08:28.119 INFO 30718 --- [ntainer#0-1-C-1] o.s.integration.handler.LoggingHandler : readKafkaChannel: item: 4 - enriched
2020-04-06 13:08:28.120 INFO 30718 --- [ntainer#0-1-C-1] o.s.integration.handler.LoggingHandler : readKafkaChannel: item: 1 - 丰富
2020-04-06 13:08:28.121 INFO 30718 --- [ntainer#0-1-C-1] o.s.integration.handler.LoggingHandler : readKafkaChannel: item: 8 - enriched
2020-04-06 13:08:28.122 INFO 30718 --- [ntainer#0-1-C-1] o.s.integration.handler.LoggingHandler : readKafkaChannel: item: 3 - enriched
2020-04-06 13:08:28.123 INFO 30718 --- [ntainer#0-1-C-1] o.s.integration.handler.LoggingHandler : readKafkaChannel: item: 9 - enriched
2020-04-06 13:08:28.124 INFO 30718 --- [ntainer#0-1-C-1] o.s.integration.handler.LoggingHandler : readKafkaChannel: item: 10 - enriched
2020-04-06 13:08:29.111 INFO 30718 --- [ask-scheduler-2] o.s.integration.handler.LoggingHandler:channelThatIsProcessingSequential - 项目:7 - 丰富
2020-04-06 13:08:30.112 INFO 30718 --- [ask-scheduler-4] o.s.integration.handler.LoggingHandler:channelThatIsProcessingSequential - 项目:5 - 丰富
2020-04-06 13:08:31.112 INFO 30718 --- [ask-scheduler-1] o.s.integration.handler.LoggingHandler:channelThatIsProcessingSequential - 项目:6 - 丰富
2020-04-06 13:08:32.113 INFO 30718 --- [ask-scheduler-5] o.s.integration.handler.LoggingHandler:channelThatIsProcessingSequential - 项目:4 - 丰富
2020-04-06 13:08:33.113 INFO 30718 --- [ask-scheduler-3] o.s.integration.handler.LoggingHandler:channelThatIsProcessingSequential - 项目:1 - 丰富
2020-04-06 13:08:34.113 INFO 30718 --- [ask-scheduler-3] o.s.integration.handler.LoggingHandler:channelThatIsProcessingSequential - 项目:8 - 丰富
2020-04-06 13:08:35.113 INFO 30718 --- [ask-scheduler-3] o.s.integration.handler.LoggingHandler:channelThatIsProcessingSequential - 项目:3 - 丰富
2020-04-06 13:08:36.114 INFO 30718 --- [ask-scheduler-3] o.s.integration.handler.LoggingHandler:channelThatIsProcessingSequential - 项目:9 - 丰富
2020-04-06 13:08:37.114 INFO 30718 --- [ask-scheduler-3] o.s.integration.handler.LoggingHandler:channelThatIsProcessingSequential - 项目:10 - 丰富

块引用

package br.com.gubee.kafaexample

import org.apache.kafka.clients.admin.NewTopic
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.http.MediaType
import org.springframework.integration.annotation.Gateway
import org.springframework.integration.annotation.MessagingGateway
import org.springframework.integration.config.EnableIntegration
import org.springframework.integration.context.IntegrationContextUtils
import org.springframework.integration.dsl.IntegrationFlow
import org.springframework.integration.dsl.IntegrationFlows
import org.springframework.integration.kafka.dsl.Kafka
import org.springframework.kafka.core.ConsumerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.listener.ContainerProperties
import org.springframework.scheduling.annotation.Async
import org.springframework.stereotype.Component
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.PathVariable
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController


@RestController
@RequestMapping(path = ["/testKafka"], produces = [MediaType.APPLICATION_JSON_VALUE])
class TestKafkaResource(private val testKafkaGateway: TestKafkaGateway) {

    @GetMapping("init/{param}")
    fun init(@PathVariable("param", required = false) param: String? = null) {
        (1..10).forEach {
            println("Send async item $it")
            testKafkaGateway.init("item: $it")
        }
    }

}

@MessagingGateway(errorChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
@Component
interface TestKafkaGateway {
    @Gateway(requestChannel = "publishKafkaChannel")
    @Async
    fun init(param: String)
}

@Configuration
@EnableIntegration
class TestKafkaFlow(private val kafkaTemplate: KafkaTemplate<*, *>,
                    private val consumerFactory: ConsumerFactory<*, *>) {

    @Bean
    fun readKafkaChannelTopic(): NewTopic {
        return NewTopic("readKafkaChannel", 40, 1)
    }

    @Bean
    fun publishKafka(): IntegrationFlow {
        return IntegrationFlows
                .from("publishKafkaChannel")
                .transform<String, String> { "${it} - enriched" }
                .handle(
                        Kafka.outboundChannelAdapter(kafkaTemplate)
                                .topic("readKafkaChannel")
                                .sendFailureChannel(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
                )
                .get()
    }

    @Bean
    fun readFromKafka(): IntegrationFlow {
        return IntegrationFlows
                .from(
                        Kafka.messageDrivenChannelAdapter(consumerFactory, "readKafkaChannel")
                                .configureListenerContainer { kafkaMessageListenerContainer ->
                                    kafkaMessageListenerContainer.concurrency(2)
                                    kafkaMessageListenerContainer.ackMode(ContainerProperties.AckMode.RECORD)
                                }
                                .errorChannel(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
                )
                .channel { c -> c.queue(10) }
                .log<String> {
                    "readKafkaChannel: ${it.payload}"
                }
                .channel("channelThatIsProcessingSequential")
                .get()
    }

    @Bean
    fun kafkaFlowAfter(): IntegrationFlow {
        return IntegrationFlows
                .from("channelThatIsProcessingSequential")
                .log<String> {
                    "channelThatIsProcessingSequential - ${it.payload}"
                }
                .get()
    }
}

正如 Gary 所说,将 Kafka 消息转移到 QueueChannel 中并不好。 Kafka.messageDrivenChannelAdapter() 上的消耗已经 async - 真的没有理由将消息移动到单独的线程。

无论如何,您似乎在使用 Spring Cloud Stream,其 PollerMetadata 配置为 1 message per second 轮询策略。

如果这不符合您的要求,您可以随时更改 .channel { c -> c.queue(10) } 以使用第二个 lambda 并在那里配置自定义 poller

顺便说一句,我们已经在 Spring 集成中实现了一些 Kotlin DSL:https://docs.spring.io/spring-integration/docs/5.3.0.M4/reference/html/kotlin-dsl.html#kotlin-dsl