一些提交偏移量调用在 kafka 中失败
Some commit offset call is getting fail in kafka
kafka 服务器和客户端 jar 已移至最新库:0.10.0.1
我的消费者和生产者代码使用上面提到的最新的 kafka 罐子,但仍然使用旧的消费者 apis(0.8.2)。
我在调用提交偏移量时遇到消费者方面的问题。
2017-04-10 00:07:14,547 ERROR kafka.consumer.ZookeeperConsumerConnector [groupid1_x.x.x.x-1491594443834-b0e2bce5], Error while committing offsets. java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120)
at kafka.consumer.ZookeeperConsumerConnector.liftedTree2(ZookeeperConsumerConnector.scala:354)
at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:351)
at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:331)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:111)
at com.zoho.mysqlbackup.kafka.consumer.KafkaHLConsumer.commitOffset(KafkaHLConsumer.java:173)
at com.zoho.mysqlbackup.kafka.consumer.KafkaHLConsumer.run(KafkaHLConsumer.java:271)
kafka服务端配置:
listeners=PLAINTEXT://:9092
log.message.format.version=0.8.2.1
broker.id.generation.enable=false
unclean.leader.election.enable=false
kafka 消费者的以下配置:
auto.commit.enable is overridden to false
auto.offset.reset is overridden to smallest
consumer.timeout.ms is overridden to 100
dual.commit.enabled is overridden to true
fetch.message.max.bytes is overridden to 209715200
group.id is overridden to crm_topic1_hadoop_tables
offsets.storage is overridden to kafka
rebalance.backoff.ms is overridden to 6000
zookeeper.session.timeout.ms is overridden to 23000
zookeeper.sync.time.ms is overridden to 2000
为了创建消费者,我在下面使用 api:
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(propForHLConsumer));
并用于提交调用
consumer.commitOffsets();
从kafka读取消息时,我们使用下面的方法来处理超时
private boolean hasNext(ConsumerIterator<byte[], byte[]> it)
{
try
{
it.hasNext();
return true;
}
catch (ConsumerTimeoutException e)
{
return false;
}
}
这是必需的,因为我们希望仅在特定时间间隔或从 kafka 收到的消息(字节)大小后才开始处理。
相同的异常,即使在设置之后
dual.commit.enabled = 假
consumer.timeout.ms = 1000
其中其他设置保持旧配置。
更多细节:
With version 0.8.2.1, I never face such problem. After moving to
0.10.0.1(client as well as server), start getting this exception.
We are reading multiple messages before processing/pushing to hadoop.
Processing/Writing to hadoop part takes time(~5 minutes). And after
this process when we try to push we are getting above exception. this
exception I'm getting on every 2nd commitOffset. And some time(where
commitOffset calling withing 10 seconds of previous commit) no
exception for 2nd commit.
for your information. if commit offset failed then consumer just
reading next messages without going back to last successful commit
offset position. but if commit offset failed and restarting consumer
process then it is reading from old commit position.
正如我在问题详情中提到的,我使用的是最新的 kafka 罐子,但仍在使用旧的消费者客户端:
kafka.javaapi.consumer.ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(propForHLConsumer));
我通过调用第二次 commitOffset 解决了这个问题。
实际上是与 connections.max.idle.ms 相关的问题。
这个 属性 是用最新的 kafka 引入的(broker=10 分钟,consumer=9 分钟,producer=9 分钟)。
因此,每当我的老消费者在 10 分钟后调用第二次提交偏移量时,我就会遇到异常。
老消费者API,没办法设置这个属性。
和经纪人配置我无法更改(由其他团队处理并为其他用户提供相同的经纪人)...
这里我认为旧的 commitOffset 调用需要另一个连接(迭代器除外),并且该连接在理想情况下超过 10 分钟时正在关闭。我不是很确定。
如果第一次 commitOffset 调用失败,那么第二次调用将确保获得成功。
如果第一个本身获得成功,那么下一个执行将不会有任何问题。无论如何,我们很少调用提交偏移量。
接下来我将使用最新的 kafka 消费者和生产者移动我的代码 java APIs.
kafka 服务器和客户端 jar 已移至最新库:0.10.0.1
我的消费者和生产者代码使用上面提到的最新的 kafka 罐子,但仍然使用旧的消费者 apis(0.8.2)。
我在调用提交偏移量时遇到消费者方面的问题。
2017-04-10 00:07:14,547 ERROR kafka.consumer.ZookeeperConsumerConnector [groupid1_x.x.x.x-1491594443834-b0e2bce5], Error while committing offsets. java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120)
at kafka.consumer.ZookeeperConsumerConnector.liftedTree2(ZookeeperConsumerConnector.scala:354)
at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:351)
at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:331)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:111)
at com.zoho.mysqlbackup.kafka.consumer.KafkaHLConsumer.commitOffset(KafkaHLConsumer.java:173)
at com.zoho.mysqlbackup.kafka.consumer.KafkaHLConsumer.run(KafkaHLConsumer.java:271)
kafka服务端配置:
listeners=PLAINTEXT://:9092
log.message.format.version=0.8.2.1
broker.id.generation.enable=false
unclean.leader.election.enable=false
kafka 消费者的以下配置:
auto.commit.enable is overridden to false
auto.offset.reset is overridden to smallest
consumer.timeout.ms is overridden to 100
dual.commit.enabled is overridden to true
fetch.message.max.bytes is overridden to 209715200
group.id is overridden to crm_topic1_hadoop_tables
offsets.storage is overridden to kafka
rebalance.backoff.ms is overridden to 6000
zookeeper.session.timeout.ms is overridden to 23000
zookeeper.sync.time.ms is overridden to 2000
为了创建消费者,我在下面使用 api:
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(propForHLConsumer));
并用于提交调用
consumer.commitOffsets();
从kafka读取消息时,我们使用下面的方法来处理超时
private boolean hasNext(ConsumerIterator<byte[], byte[]> it)
{
try
{
it.hasNext();
return true;
}
catch (ConsumerTimeoutException e)
{
return false;
}
}
这是必需的,因为我们希望仅在特定时间间隔或从 kafka 收到的消息(字节)大小后才开始处理。
相同的异常,即使在设置之后 dual.commit.enabled = 假 consumer.timeout.ms = 1000 其中其他设置保持旧配置。
更多细节:
With version 0.8.2.1, I never face such problem. After moving to 0.10.0.1(client as well as server), start getting this exception.
We are reading multiple messages before processing/pushing to hadoop. Processing/Writing to hadoop part takes time(~5 minutes). And after this process when we try to push we are getting above exception. this exception I'm getting on every 2nd commitOffset. And some time(where commitOffset calling withing 10 seconds of previous commit) no exception for 2nd commit.
for your information. if commit offset failed then consumer just reading next messages without going back to last successful commit offset position. but if commit offset failed and restarting consumer process then it is reading from old commit position.
正如我在问题详情中提到的,我使用的是最新的 kafka 罐子,但仍在使用旧的消费者客户端:
kafka.javaapi.consumer.ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(propForHLConsumer));
我通过调用第二次 commitOffset 解决了这个问题。
实际上是与 connections.max.idle.ms 相关的问题。 这个 属性 是用最新的 kafka 引入的(broker=10 分钟,consumer=9 分钟,producer=9 分钟)。
因此,每当我的老消费者在 10 分钟后调用第二次提交偏移量时,我就会遇到异常。
老消费者API,没办法设置这个属性。 和经纪人配置我无法更改(由其他团队处理并为其他用户提供相同的经纪人)...
这里我认为旧的 commitOffset 调用需要另一个连接(迭代器除外),并且该连接在理想情况下超过 10 分钟时正在关闭。我不是很确定。
如果第一次 commitOffset 调用失败,那么第二次调用将确保获得成功。 如果第一个本身获得成功,那么下一个执行将不会有任何问题。无论如何,我们很少调用提交偏移量。
接下来我将使用最新的 kafka 消费者和生产者移动我的代码 java APIs.