如何处理Polled Consumer(PollableMessageSource)中的批记录?
How to process batch records in Polled Consumer(PollableMessageSource )?
我在 spring 云流中使用 PolledConsumer。
我的消费者看起来像这样:
@Bean
public ApplicationRunner runner(PollableMessageSource input, MessageChannel output) {
return args -> {
System.out.println("Send some messages to topic polledConsumerIn and receive from polledConsumerOut");
System.out.println("Messages will be processed one per second");
exec.execute(() -> {
boolean result = false;
while (true) {
// this is where we poll for a message, process it, and send a new one
result = input.poll(m -> {
String payload = (String) m.getPayload();
System.out.println("Received: " + payload);
output.send(MessageBuilder.withPayload(payload.toUpperCase())
.copyHeaders(m.getHeaders())
.build());
}, new ParameterizedTypeReference<String>() { });
try {
Thread.sleep(1_000);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
if (result) {
System.out.println("Success");
}
}
});
};
}
我正在尝试以批处理模式使用记录,我的目标是在轮询后获取记录列表,因为我的方法是批处理。
在代码中 input.poll 方法采用 MessageHandler,它采用单条记录作为参数。
在我这样设置配置之后:
绑定:
人:
消费者:
批处理模式:true
binder:
configuration:
max.poll.records: 1500
fetch.min.bytes: 900000
fetch.max.wait.ms: 5000
value.deserializer: tr.PersonDeserializer
结果还是一样
有什么方法可以处理 MessageHandler 中的记录列表,这意味着 m.getPayload 的类型是 List<>?
polled consumer暂不支持批量消费
我在 spring 云流中使用 PolledConsumer。 我的消费者看起来像这样:
@Bean
public ApplicationRunner runner(PollableMessageSource input, MessageChannel output) {
return args -> {
System.out.println("Send some messages to topic polledConsumerIn and receive from polledConsumerOut");
System.out.println("Messages will be processed one per second");
exec.execute(() -> {
boolean result = false;
while (true) {
// this is where we poll for a message, process it, and send a new one
result = input.poll(m -> {
String payload = (String) m.getPayload();
System.out.println("Received: " + payload);
output.send(MessageBuilder.withPayload(payload.toUpperCase())
.copyHeaders(m.getHeaders())
.build());
}, new ParameterizedTypeReference<String>() { });
try {
Thread.sleep(1_000);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
if (result) {
System.out.println("Success");
}
}
});
};
}
我正在尝试以批处理模式使用记录,我的目标是在轮询后获取记录列表,因为我的方法是批处理。 在代码中 input.poll 方法采用 MessageHandler,它采用单条记录作为参数。 在我这样设置配置之后: 绑定: 人: 消费者: 批处理模式:true
binder:
configuration:
max.poll.records: 1500
fetch.min.bytes: 900000
fetch.max.wait.ms: 5000
value.deserializer: tr.PersonDeserializer
结果还是一样
有什么方法可以处理 MessageHandler 中的记录列表,这意味着 m.getPayload 的类型是 List<>?
polled consumer暂不支持批量消费