将应用程序从 Kafka 0.8.2.1 移植到 Kafka 0.9.0。读数偏移问题
Porting app from Kafka 0.8.2.1 to Kafka 0.9.0. Reading offsets issue
我们遇到了与我们的应用程序代码从 Apache Kafka 版本 0.8.2.1 迁移到 0.9.0.0 相关的问题。
在这种情况下,我们指的是 Cloudera 发布的 Kafka 版本:
kafka_2.10-0.8.2.0-kafka-1.3.2
kafka_2.11-0.9.0-kafka-2.0.2
我们在读取和写入 __consumer_offsets 元数据主题上的偏移量时检测到问题。
特别是,我们使用 BlockingChannel 连接到 Kafka Broker,在调用 receive() 方法时我们得到一个 EOFException。
特别是:
java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel (NetworkReceive.java:83)
at kafka.network.BlockingChannel.readCompletely (BlockingChannel.scala: 129)
at kafka.network.BlockingChannel.receive (BlockingChannel.scala: 120)
一个可能的原因可能是两个版本的 Kafka 之间的差异 API。
卡夫卡 0.8.2
在我们的应用中,我们调用
ConsumerMetadataResponse.readFrom(channel.receive().buffer())
接收方法如下
def receive(): Receive = {
if(!connected)
throw new ClosedChannelException()
val response = new BoundedByteBufferReceive()
response.readCompletely(readChannel)
response
}
如我们所见returns一个kafka.network.Receive,这是一个扩展了特征kafka.network.Transmission的特征。
在这个Receive中,buffer方法是存在的,在kafka.network.BoundedByteBufferReceive
中被重写了
def buffer: ByteBuffer = {
expectComplete()
contentBuffer
}
卡夫卡 0.9.0
我们将上一行更改为
GroupCoordinatorResponse.readFrom(channel.receive().payload())
本版本API的receive方法如下
def receive(): NetworkReceive = {
if(!connected)
throw new ClosedChannelException()
val response = readCompletely(readChannel)
response.payload().rewind()
response
}
private def readCompletely(channel: ReadableByteChannel): NetworkReceive = {
val response = new NetworkReceive
while (!response.complete())
response.readFromReadableChannel(channel)
response
}
正如我们所看到的,这个returns而不是一个kafka.network.NetworkReceive,它是一个class实现了接口kafka.network.Receive,现在写在java和上一个完全不同。
这里没有buffer方法,只有returns
内容的payload方法
private ByteBuffer buffer;
我们怎么解决?
提前致谢
Kafka 0.9 维护旧的 Kafka 消费者以实现 backward-compatibility 与 Kafka 0.8.2 代理。
您正在使用 Kafka 0.9 中仍然存在的旧消费者来读取来自 Kafka 0.9 的消息。
您应该开始使用 Kafka 0.9 的新消费者 API 从 Kafka 0.9 代理读取数据。
希望对您有所帮助。
我们遇到了与我们的应用程序代码从 Apache Kafka 版本 0.8.2.1 迁移到 0.9.0.0 相关的问题。
在这种情况下,我们指的是 Cloudera 发布的 Kafka 版本:
kafka_2.10-0.8.2.0-kafka-1.3.2
kafka_2.11-0.9.0-kafka-2.0.2
我们在读取和写入 __consumer_offsets 元数据主题上的偏移量时检测到问题。 特别是,我们使用 BlockingChannel 连接到 Kafka Broker,在调用 receive() 方法时我们得到一个 EOFException。
特别是:
java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel (NetworkReceive.java:83)
at kafka.network.BlockingChannel.readCompletely (BlockingChannel.scala: 129)
at kafka.network.BlockingChannel.receive (BlockingChannel.scala: 120)
一个可能的原因可能是两个版本的 Kafka 之间的差异 API。
卡夫卡 0.8.2
在我们的应用中,我们调用
ConsumerMetadataResponse.readFrom(channel.receive().buffer())
接收方法如下
def receive(): Receive = {
if(!connected)
throw new ClosedChannelException()
val response = new BoundedByteBufferReceive()
response.readCompletely(readChannel)
response
}
如我们所见returns一个kafka.network.Receive,这是一个扩展了特征kafka.network.Transmission的特征。 在这个Receive中,buffer方法是存在的,在kafka.network.BoundedByteBufferReceive
中被重写了def buffer: ByteBuffer = {
expectComplete()
contentBuffer
}
卡夫卡 0.9.0
我们将上一行更改为
GroupCoordinatorResponse.readFrom(channel.receive().payload())
本版本API的receive方法如下
def receive(): NetworkReceive = {
if(!connected)
throw new ClosedChannelException()
val response = readCompletely(readChannel)
response.payload().rewind()
response
}
private def readCompletely(channel: ReadableByteChannel): NetworkReceive = {
val response = new NetworkReceive
while (!response.complete())
response.readFromReadableChannel(channel)
response
}
正如我们所看到的,这个returns而不是一个kafka.network.NetworkReceive,它是一个class实现了接口kafka.network.Receive,现在写在java和上一个完全不同。 这里没有buffer方法,只有returns
内容的payload方法 private ByteBuffer buffer;
我们怎么解决? 提前致谢
Kafka 0.9 维护旧的 Kafka 消费者以实现 backward-compatibility 与 Kafka 0.8.2 代理。 您正在使用 Kafka 0.9 中仍然存在的旧消费者来读取来自 Kafka 0.9 的消息。 您应该开始使用 Kafka 0.9 的新消费者 API 从 Kafka 0.9 代理读取数据。
希望对您有所帮助。