Samza 0.14.1 没有正确处理 OffsetOutOfRangeException 异常?
Samza 0.14.1 not correctly handling OffsetOutOfRangeException exception?
我们面临着与此 thread 中所述相同的问题。
此处 - Samza 正在请求太旧的 Kafka 分区偏移量(即 Kafka 日志已向前移动)。我们正在设置 属性
consumer.auto.offset.reset
到 smallest
,因此期望 Samza 在这种情况下将其检查点重置为最早可用的分区偏移量。但这并没有发生,我们不断收到这种形式的异常:
INFO [2018-08-21 19:26:20,924] [U:669,F:454,T:1,123,M:2,658]
kafka.producer.SyncProducer:[Logging_class:info:66] - [main] -
Disconnecting from vrni-platform-release:9092
INFO [2018-08-21 19:26:20,924] [U:669,F:454,T:1,123,M:2,658]
system.kafka.GetOffset:[Logging_class:info:63] - [main] - Validating offset
56443499 for topic and partition Topic3-0
WARN [2018-08-21 19:26:20,925] [U:669,F:454,T:1,123,M:2,658]
system.kafka.KafkaSystemConsumer:[Logging_class:warn:74] - [main] - While
refreshing brokers for Topic3-0:
org.apache.kafka.common.errors.OffsetOutOfRangeException: The requested
offset is not within the range of offsets maintained by the server..
Retrying
版本详情
- 萨姆扎:2.11-0.14.1
- 卡夫卡客户端:1.1.0
- Kafka 服务器:1.1.0 Scala 2.11
浏览代码,似乎 GetOffset::isValidOffset
应该能够捕获异常 OffsetOutOfRangeException
并将其转换为假值。但似乎这并没有发生。 Exception
的 package
会不会不匹配? GetOffSet class 正在赶上
异常 import kafka.common.OffsetOutOfRangeException
,但从日志来看,这个 class 的包似乎不同。会不会是这个原因?
def isValidOffset(consumer: DefaultFetchSimpleConsumer, topicAndPartition: TopicAndPartition, offset: String) = {
info("Validating offset %s for topic and partition %s" format (offset, topicAndPartition))
try {
val messages = consumer.defaultFetch((topicAndPartition, offset.toLong))
if (messages.hasError) {
KafkaUtil.maybeThrowException(messages.error(topicAndPartition.topic, topicAndPartition.partition).exception())
}
info("Able to successfully read from offset %s for topic and partition %s. Using it to instantiate consumer." format (offset, topicAndPartition))
true
} catch {
case e: OffsetOutOfRangeException => false
}
}
此外,似乎 BrokerProxy class - GetOffset
的调用者会打印一条日志 "It appears that..."
以防它获得错误值,但事实并非如此记录此行(表明 GetOffset
方法中生成的某些异常未被捕获并向上传播):
def addTopicPartition(tp: TopicAndPartition, nextOffset: Option[String]) = {
debug("Adding new topic and partition %s to queue for %s" format (tp, host))
if (nextOffsets.asJava.containsKey(tp)) {
toss("Already consuming TopicPartition %s" format tp)
}
val offset = if (nextOffset.isDefined && offsetGetter.isValidOffset(simpleConsumer, tp, nextOffset.get)) {
nextOffset
.get
.toLong
} else {
warn("It appears that we received an invalid or empty offset %s for %s. Attempting to use Kafka's auto.offset.reset setting. This can result in data loss if processing continues." format (nextOffset, tp))
offsetGetter.getResetOffset(simpleConsumer, tp)
}
debug("Got offset %s for new topic and partition %s." format (offset, tp))
nextOffsets += tp -> offset
metrics.topicPartitions.get((host, port)).set(nextOffsets.size)
}
这可能是因为我们使用的 Kafka 客户端库版本不匹配?
是否有我们应该与 Samza 0.14.1 一起使用的推荐 Kafka 客户端版本(假设 Kafka 服务器是 1.x)?
如有任何帮助,我们将不胜感激。
以上是 samza 0.14.0 和 0.14.1 中的错误。 SAMZA-1822 是错误 ID。
这也在 samza mailing list 中讨论。
我们面临着与此 thread 中所述相同的问题。
此处 - Samza 正在请求太旧的 Kafka 分区偏移量(即 Kafka 日志已向前移动)。我们正在设置 属性
consumer.auto.offset.reset
到 smallest
,因此期望 Samza 在这种情况下将其检查点重置为最早可用的分区偏移量。但这并没有发生,我们不断收到这种形式的异常:
INFO [2018-08-21 19:26:20,924] [U:669,F:454,T:1,123,M:2,658]
kafka.producer.SyncProducer:[Logging_class:info:66] - [main] -
Disconnecting from vrni-platform-release:9092
INFO [2018-08-21 19:26:20,924] [U:669,F:454,T:1,123,M:2,658]
system.kafka.GetOffset:[Logging_class:info:63] - [main] - Validating offset
56443499 for topic and partition Topic3-0
WARN [2018-08-21 19:26:20,925] [U:669,F:454,T:1,123,M:2,658]
system.kafka.KafkaSystemConsumer:[Logging_class:warn:74] - [main] - While
refreshing brokers for Topic3-0:
org.apache.kafka.common.errors.OffsetOutOfRangeException: The requested
offset is not within the range of offsets maintained by the server..
Retrying
版本详情
- 萨姆扎:2.11-0.14.1
- 卡夫卡客户端:1.1.0
- Kafka 服务器:1.1.0 Scala 2.11
浏览代码,似乎 GetOffset::isValidOffset
应该能够捕获异常 OffsetOutOfRangeException
并将其转换为假值。但似乎这并没有发生。 Exception
的 package
会不会不匹配? GetOffSet class 正在赶上
异常 import kafka.common.OffsetOutOfRangeException
,但从日志来看,这个 class 的包似乎不同。会不会是这个原因?
def isValidOffset(consumer: DefaultFetchSimpleConsumer, topicAndPartition: TopicAndPartition, offset: String) = {
info("Validating offset %s for topic and partition %s" format (offset, topicAndPartition))
try {
val messages = consumer.defaultFetch((topicAndPartition, offset.toLong))
if (messages.hasError) {
KafkaUtil.maybeThrowException(messages.error(topicAndPartition.topic, topicAndPartition.partition).exception())
}
info("Able to successfully read from offset %s for topic and partition %s. Using it to instantiate consumer." format (offset, topicAndPartition))
true
} catch {
case e: OffsetOutOfRangeException => false
}
}
此外,似乎 BrokerProxy class - GetOffset
的调用者会打印一条日志 "It appears that..."
以防它获得错误值,但事实并非如此记录此行(表明 GetOffset
方法中生成的某些异常未被捕获并向上传播):
def addTopicPartition(tp: TopicAndPartition, nextOffset: Option[String]) = {
debug("Adding new topic and partition %s to queue for %s" format (tp, host))
if (nextOffsets.asJava.containsKey(tp)) {
toss("Already consuming TopicPartition %s" format tp)
}
val offset = if (nextOffset.isDefined && offsetGetter.isValidOffset(simpleConsumer, tp, nextOffset.get)) {
nextOffset
.get
.toLong
} else {
warn("It appears that we received an invalid or empty offset %s for %s. Attempting to use Kafka's auto.offset.reset setting. This can result in data loss if processing continues." format (nextOffset, tp))
offsetGetter.getResetOffset(simpleConsumer, tp)
}
debug("Got offset %s for new topic and partition %s." format (offset, tp))
nextOffsets += tp -> offset
metrics.topicPartitions.get((host, port)).set(nextOffsets.size)
}
这可能是因为我们使用的 Kafka 客户端库版本不匹配? 是否有我们应该与 Samza 0.14.1 一起使用的推荐 Kafka 客户端版本(假设 Kafka 服务器是 1.x)?
如有任何帮助,我们将不胜感激。
以上是 samza 0.14.0 和 0.14.1 中的错误。 SAMZA-1822 是错误 ID。
这也在 samza mailing list 中讨论。