Kafka消费者在循环消费消息时丢失消息
Kafka consumer missing messages while consuming messages in loop
由于内存限制,我运行循环使用我的消费者代码,提交我的数据,然后加载到表中
以下是将 运行 循环
的代码
// here is the main part of the component,
// a piece of code executed in the row
// loop
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
System.out.println("Consumer created");
consumer.subscribe(Arrays.asList(topic));
System.out.println("Subscribed to topic " + topic);
try {
while (pollFlag) {
ConsumerRecords<String, String> records = consumer.poll(context.consumer_polltime);
if (records.isEmpty()) {
globalMap.put("emptyRecordsFlag",false); //Passing the flag value to previous component to end loop
break;
}
for (ConsumerRecord<String, String> record : records) {
listPayload.add(record.value()); // Adding the messages to list
i++;
if(i>=msgbtch)
{
pollFlag = false; // Assigning flag value to end the poll at 5000 messages
break;
}
}
}
globalMap.put("ConsumerObj",consumer);
} catch (Exception e) {
System.out.println("Error Consuming Msg: " + e);
// TODO: handle exception
//consumer.close();
}
row3.payload= String.valueOf(listPayload); // Passing the message data to next component
System.out.println("Committing");
consumer.commitSync();
System.out.println("Closing");
consumer.close();
但由于某种原因,我似乎遗漏了一些消息。我相信这与消费者有关 rebalancing/Committing.
如何检查我的消费者是否准备好从一开始就消费下一批消息而不遗漏任何消息?
更新:
我能够自己弄清楚这个问题。消息已经下载到记录中,并且循环播放,因为我设置了以下条件
if(i>=msgbtch)
{
pollFlag = false; // Assigning flag value to end the poll at 5000 messages
break;
}
甚至在将所有消息放入列表之前,循环就中断了,并且来自记录的所有消息都没有插入到列表中。我已经删除了中断条件并且它工作正常
由于内存限制,我运行循环使用我的消费者代码,提交我的数据,然后加载到表中
以下是将 运行 循环
的代码// here is the main part of the component,
// a piece of code executed in the row
// loop
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
System.out.println("Consumer created");
consumer.subscribe(Arrays.asList(topic));
System.out.println("Subscribed to topic " + topic);
try {
while (pollFlag) {
ConsumerRecords<String, String> records = consumer.poll(context.consumer_polltime);
if (records.isEmpty()) {
globalMap.put("emptyRecordsFlag",false); //Passing the flag value to previous component to end loop
break;
}
for (ConsumerRecord<String, String> record : records) {
listPayload.add(record.value()); // Adding the messages to list
i++;
if(i>=msgbtch)
{
pollFlag = false; // Assigning flag value to end the poll at 5000 messages
break;
}
}
}
globalMap.put("ConsumerObj",consumer);
} catch (Exception e) {
System.out.println("Error Consuming Msg: " + e);
// TODO: handle exception
//consumer.close();
}
row3.payload= String.valueOf(listPayload); // Passing the message data to next component
System.out.println("Committing");
consumer.commitSync();
System.out.println("Closing");
consumer.close();
但由于某种原因,我似乎遗漏了一些消息。我相信这与消费者有关 rebalancing/Committing.
如何检查我的消费者是否准备好从一开始就消费下一批消息而不遗漏任何消息?
更新: 我能够自己弄清楚这个问题。消息已经下载到记录中,并且循环播放,因为我设置了以下条件
if(i>=msgbtch)
{
pollFlag = false; // Assigning flag value to end the poll at 5000 messages
break;
}
甚至在将所有消息放入列表之前,循环就中断了,并且来自记录的所有消息都没有插入到列表中。我已经删除了中断条件并且它工作正常