Spring 云端Kafka限制单位时间消息消费
Spring cloud Kafka limit message consumption per time unit
我有一个带有 spring 云流的消费者应用程序。此应用程序正在使用来自队列 (Kafka) 的消息,并且针对每条消息,该应用程序对 4 个不同的服务器进行 4 次不同的 HTTP 调用,其中一个非常慢(10 秒响应)。当队列充满消息时,比如 6000,应用程序崩溃有几个原因(1 - netty 直接内存用完,2 - 我们正在使用反应堆,线程池变空)。
有什么方法可以通过 spring-cloud-stream 或 kafka 来限制消费者端的消费速度吗?像每秒最大消息数这样的东西会很好。
这里可以看到kafka的配置(application.yml)
spring:
kafka:
bootstrap-servers: my-cloud-kafka-instance
admin:
ssl:
protocol: SSL
properties:
security.protocol: SSL
cloud:
stream:
bindings:
input:
group: my-group
destination: my-destination
content-type: application/json
这是我的消费者(在 kotlin 中):
@Controller
@EnableBinding(Processor::class)
class MyConsumer(
myDependendies
) {
@StreamListener(Processor.INPUT)
fun myMethod(
@Headers headers: Map<String, String>,
@Payload myMessage: Message
) {
myBussinessLogic
}
}
Pollable Message Source 允许消费者控制消费率。例如,为了简单说明,我们先定义一个接口:
public interface PolledProcessor {
@Input
PollableMessageSource destIn();
@Output
MessageChannel destOut();
}
使用示例:
@Autowired
private PolledProcessor polledProcessor;
@Scheduled(fixedDelay = 5_000)
public void poll() {
polledProcessor.destIn().poll(message -> {
byte[] bytes = (byte[]) message.getPayload();
String payload = new String(bytes);
logger.info("Received: " + payload);
polledProcessor.destOut().send(MessageBuilder.withPayload(payload.toUpperCase())
.copyHeaders(message.getHeaders())
.build());
});
}
参考资源:
https://spring.io/blog/2018/02/27/spring-cloud-stream-2-0-polled-consumers
我有一个带有 spring 云流的消费者应用程序。此应用程序正在使用来自队列 (Kafka) 的消息,并且针对每条消息,该应用程序对 4 个不同的服务器进行 4 次不同的 HTTP 调用,其中一个非常慢(10 秒响应)。当队列充满消息时,比如 6000,应用程序崩溃有几个原因(1 - netty 直接内存用完,2 - 我们正在使用反应堆,线程池变空)。
有什么方法可以通过 spring-cloud-stream 或 kafka 来限制消费者端的消费速度吗?像每秒最大消息数这样的东西会很好。
这里可以看到kafka的配置(application.yml)
spring:
kafka:
bootstrap-servers: my-cloud-kafka-instance
admin:
ssl:
protocol: SSL
properties:
security.protocol: SSL
cloud:
stream:
bindings:
input:
group: my-group
destination: my-destination
content-type: application/json
这是我的消费者(在 kotlin 中):
@Controller
@EnableBinding(Processor::class)
class MyConsumer(
myDependendies
) {
@StreamListener(Processor.INPUT)
fun myMethod(
@Headers headers: Map<String, String>,
@Payload myMessage: Message
) {
myBussinessLogic
}
}
Pollable Message Source 允许消费者控制消费率。例如,为了简单说明,我们先定义一个接口:
public interface PolledProcessor {
@Input
PollableMessageSource destIn();
@Output
MessageChannel destOut();
}
使用示例:
@Autowired
private PolledProcessor polledProcessor;
@Scheduled(fixedDelay = 5_000)
public void poll() {
polledProcessor.destIn().poll(message -> {
byte[] bytes = (byte[]) message.getPayload();
String payload = new String(bytes);
logger.info("Received: " + payload);
polledProcessor.destOut().send(MessageBuilder.withPayload(payload.toUpperCase())
.copyHeaders(message.getHeaders())
.build());
});
}
参考资源:
https://spring.io/blog/2018/02/27/spring-cloud-stream-2-0-polled-consumers