Spring 云端Kafka限制单位时间消息消费

Spring cloud Kafka limit message consumption per time unit

我有一个带有 spring 云流的消费者应用程序。此应用程序正在使用来自队列 (Kafka) 的消息,并且针对每条消息,该应用程序对 4 个不同的服务器进行 4 次不同的 HTTP 调用,其中一个非常慢(10 秒响应)。当队列充满消息时,比如 6000,应用程序崩溃有几个原因(1 - netty 直接内存用完,2 - 我们正在使用反应堆,线程池变空)。

有什么方法可以通过 spring-cloud-stream 或 kafka 来限制消费者端的消费速度吗?像每秒最大消息数这样的东西会很好。

这里可以看到kafka的配置(application.yml)

spring:
  kafka:
    bootstrap-servers: my-cloud-kafka-instance
    admin:
      ssl:
        protocol: SSL
    properties:
      security.protocol: SSL
  cloud:
    stream:
      bindings:
        input:
          group: my-group
          destination: my-destination
          content-type: application/json

这是我的消费者(在 kotlin 中):

@Controller
@EnableBinding(Processor::class)
class MyConsumer(
        myDependendies
) {

    @StreamListener(Processor.INPUT)
    fun myMethod(
            @Headers headers: Map<String, String>,
            @Payload myMessage: Message
    ) {
        myBussinessLogic
    }
}

Pollable Message Source 允许消费者控制消费率。例如,为了简单说明,我们先定义一个接口:

public interface PolledProcessor {

    @Input
    PollableMessageSource destIn();

    @Output
    MessageChannel destOut();

}

使用示例:

@Autowired
private PolledProcessor polledProcessor;


@Scheduled(fixedDelay = 5_000)
public void poll() {
    polledProcessor.destIn().poll(message -> {
        byte[] bytes = (byte[]) message.getPayload();
        String payload = new String(bytes);
        logger.info("Received: " + payload);
        polledProcessor.destOut().send(MessageBuilder.withPayload(payload.toUpperCase())
                .copyHeaders(message.getHeaders())
                .build());
    });
}

参考资源:

https://spring.io/blog/2018/02/27/spring-cloud-stream-2-0-polled-consumers