为什么使用 kafka 的 Spark Streaming 总是在 seektoEnd 之前轮询(0)
Why Spark Streaming with kafka always poll(0) before seektoEnd
- Spark 版本:2.4.5
- 组件:Spark 流
- Class: DirectKafkaInputDStream
在class DirectKafkaInputDStream
中,我有点困惑为什么要在seekToEnd之前调用paraniodPoll
?
protected def latestOffsets(): Map[TopicPartition, Long] = {
val c = consumer
paranoidPoll(c)
val parts = c.assignment().asScala
// make sure new partitions are reflected in currentOffsets
val newPartitions = parts.diff(currentOffsets.keySet)
// Check if there's any partition been revoked because of consumer rebalance.
val revokedPartitions = currentOffsets.keySet.diff(parts)
if (revokedPartitions.nonEmpty) {
throw new IllegalStateException(s"Previously tracked partitions " +
s"${revokedPartitions.mkString("[", ",", "]")} been revoked by Kafka because of consumer " +
s"rebalance. This is mostly due to another stream with same group id joined, " +
s"please check if there're different streaming application misconfigure to use same " +
s"group id. Fundamentally different stream should use different group id")
}
// position for new partitions determined by auto.offset.reset if no commit
currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp)).toMap
// find latest available offsets
c.seekToEnd(currentOffsets.keySet.asJava)
parts.map(tp => tp -> c.position(tp)).toMap
}
没有 poll(0),assignment() 可能 return 空集,poll 确保客户端连接到 Kafka 协调器节点。
但 kafka-client 已弃用 poll(0),请检查 Spark 的备用 API。
另见:KafkaConsumer assignment() returns empty
- Spark 版本:2.4.5
- 组件:Spark 流
- Class: DirectKafkaInputDStream
在class DirectKafkaInputDStream
中,我有点困惑为什么要在seekToEnd之前调用paraniodPoll
?
protected def latestOffsets(): Map[TopicPartition, Long] = {
val c = consumer
paranoidPoll(c)
val parts = c.assignment().asScala
// make sure new partitions are reflected in currentOffsets
val newPartitions = parts.diff(currentOffsets.keySet)
// Check if there's any partition been revoked because of consumer rebalance.
val revokedPartitions = currentOffsets.keySet.diff(parts)
if (revokedPartitions.nonEmpty) {
throw new IllegalStateException(s"Previously tracked partitions " +
s"${revokedPartitions.mkString("[", ",", "]")} been revoked by Kafka because of consumer " +
s"rebalance. This is mostly due to another stream with same group id joined, " +
s"please check if there're different streaming application misconfigure to use same " +
s"group id. Fundamentally different stream should use different group id")
}
// position for new partitions determined by auto.offset.reset if no commit
currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp)).toMap
// find latest available offsets
c.seekToEnd(currentOffsets.keySet.asJava)
parts.map(tp => tp -> c.position(tp)).toMap
}
没有 poll(0),assignment() 可能 return 空集,poll 确保客户端连接到 Kafka 协调器节点。
但 kafka-client 已弃用 poll(0),请检查 Spark 的备用 API。
另见:KafkaConsumer assignment() returns empty