批量消费骆驼kafka
Batch consumer camel kafka
尽管遵循此处发布的示例,但我无法与 kafka 骆驼消费者一起批量阅读。我是否需要对我的生产者进行更改,或者我的消费者配置最有可能出现问题?
有问题的应用程序利用 kafka camel component 从休息端点提取消息,验证它们,并将它们放在主题上。然后我有一个单独的服务从主题中使用它们并将它们保存在时间序列数据库中。
一次生成和使用一条消息,但数据库希望批量使用和提交消息以获得最佳性能。在不接触生产者的情况下,我尝试调整消费者以匹配此问题答案中的示例:
How to transactionally poll Kafka from Camel?
我不确定消息会如何显示,所以现在我只是记录它们:
from(kafkaReadingConsumerEndpoint).routeId("rawReadingsConsumer").process(exchange -> {
// simple approach to generating errors
String body = exchange.getIn().getBody(String.class);
if (body.startsWith("error")) {
throw new RuntimeException("can't handle the message");
}
log.info("BODY:{}", body);
}).process(kafkaOffsetManager);
但邮件似乎仍然一次收到一封,没有批量读取。
我的消费者配置是这样的:
kafka:
host: myhost
port: myport
consumer:
seekTo: beginning
maxPartitionFetchBytes: 55000
maxPollRecords: 50
consumerCount: 1
autoOffsetReset: earliest
autoCommitEnable: false
allowManualCommit: true
breakOnFirstError: true
我的配置是否需要工作,或者我是否需要对生产者进行更改才能使其正常工作?
在最底层,KafkaConsumer#poll
方法是去return一个Iterator<ConsumerRecord>
;没有办法解决这个问题。
我没有 in-depth 使用 Camel 的经验,但为了获得“一批”记录,您需要一些中间集合来“排队”您最终要发送的数据一些“收集消费者”过程的下游。然后你将需要一些“切换”处理器,它会说“等待,处理这批”或“继续填充这批”。
就数据库而言,这个过程正是 Kafka Connect JDBC Sink 使用 batch.size
配置所做的。
尽管遵循此处发布的示例,但我无法与 kafka 骆驼消费者一起批量阅读。我是否需要对我的生产者进行更改,或者我的消费者配置最有可能出现问题?
有问题的应用程序利用 kafka camel component 从休息端点提取消息,验证它们,并将它们放在主题上。然后我有一个单独的服务从主题中使用它们并将它们保存在时间序列数据库中。
一次生成和使用一条消息,但数据库希望批量使用和提交消息以获得最佳性能。在不接触生产者的情况下,我尝试调整消费者以匹配此问题答案中的示例:
How to transactionally poll Kafka from Camel?
我不确定消息会如何显示,所以现在我只是记录它们:
from(kafkaReadingConsumerEndpoint).routeId("rawReadingsConsumer").process(exchange -> {
// simple approach to generating errors
String body = exchange.getIn().getBody(String.class);
if (body.startsWith("error")) {
throw new RuntimeException("can't handle the message");
}
log.info("BODY:{}", body);
}).process(kafkaOffsetManager);
但邮件似乎仍然一次收到一封,没有批量读取。
我的消费者配置是这样的:
kafka:
host: myhost
port: myport
consumer:
seekTo: beginning
maxPartitionFetchBytes: 55000
maxPollRecords: 50
consumerCount: 1
autoOffsetReset: earliest
autoCommitEnable: false
allowManualCommit: true
breakOnFirstError: true
我的配置是否需要工作,或者我是否需要对生产者进行更改才能使其正常工作?
在最底层,KafkaConsumer#poll
方法是去return一个Iterator<ConsumerRecord>
;没有办法解决这个问题。
我没有 in-depth 使用 Camel 的经验,但为了获得“一批”记录,您需要一些中间集合来“排队”您最终要发送的数据一些“收集消费者”过程的下游。然后你将需要一些“切换”处理器,它会说“等待,处理这批”或“继续填充这批”。
就数据库而言,这个过程正是 Kafka Connect JDBC Sink 使用 batch.size
配置所做的。