具有高级消费者的 Apache Kafka:跳过损坏的消息

Apache Kafka with High Level Consumer: Skip corrupted messages

我正面临高级 kafka 消费者 (0.8.2.0) 的问题 - 在消耗了一定数量的数据后,我们的一位消费者停止了。重新启动后,它会消耗一些消息并再次停止,没有 error/exception 或警告。

经过一番调查,我发现消费者的问题是这个异常:

ERROR c.u.u.e.impl.kafka.KafkaConsumer  - Error consuming message stream:
 kafka.message.InvalidMessageException: Message is corrupt (stored crc = 3801080313, computed crc = 2728178222)

有什么想法可以让我完全跳过此类消息吗?

所以,回答我自己的问题。经过对Kafka Consumer的一些调试,我找到了一种可能的解决方案:

  1. 创建 kafka.consumer.ConsumerIterator
  2. 的子类
  3. 覆盖makeNext-方法。在此方法中捕获 InvalidMessageException 和 return 一些虚拟占位符。
  4. 在您的 while 循环中,您必须将 kafka.consumer.ConsumerIterator 转换为您的实现。不幸的是 kafka.consumer.ConsumerIterator 的所有字段都是私有的,因此您必须使用反射。

所以这是代码示例:

val skipIt = createKafkaSkippingIterator(ks.iterator())

while(skipIt.hasNext()) {
  val messageAndTopic = skipIt.next()

  if (messageNotCorrupt(messageAndTopic)) {
    consumeFn(messageAndTopic)
  }
}

messageNotCorrupt-方法只是检查参数是否等于虚拟消息。

另一个解决方案,可能更简单,使用 Kafka 0.8.2 客户端。

try {
  val m = it.next()
  //...
} catch {
  case e: kafka.message.InvalidMessageException ⇒
    log.warn("Corrupted message. Skipping.", e)
    resetIteratorState(it)
}

//...

def resetIteratorState(it: ConsumerIterator[Array[Byte], Array[Byte]]): Unit = {
  val method = classOf[IteratorTemplate[_]].getDeclaredMethod("resetState")
  method.setAccessible(true)
  method.invoke(it)
}