无法在kafka direct stream、Spark streaming中手动提交offset
Unable to manually commit offset in kafka direct stream, Spark streaming
我正在尝试验证手动偏移提交的工作情况。
当我尝试通过在 while 循环中使用 thread.sleep()/jssc.stop()/ 抛出异常来退出作业时,我看到正在提交偏移量。
我只是为了测试而发送了几条消息,但是作业开始处理批处理后我发现延迟为 0。
spark 何时实际提交偏移量?
JavaInputDStream<ConsumerRecord<String, String>> kafkaStream = KafkaUtils.createDirectStream(jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
kafkaStream.foreachRDD(kafkaStreamRDD -> {
// fetch kafka offsets for manually committing it later
OffsetRange[] offsetRanges = ((HasOffsetRanges) kafkaStreamRDD.rdd()).offsetRanges();
// filter unwanted data
kafkaStreamRDD.filter(new Function<ConsumerRecord<String, String>, Boolean>() {
//filter logic here
}).foreachPartition(kafkaRecords -> {
//Initializing DB connections
while (kafkaRecords.hasNext()) {
//doing some work here
//-----> EXCEPTION
throw new Exception();
}
});
// commit offsets saveOffsets after processing
((CanCommitOffsets) kafkaStream.inputDStream()).commitAsync(offsetRanges, (offsets, exception) -> {
if (exception != null) {
System.out.println("-------------Unable to commit offsets, something went wrong, trace ------------"+ exception.getCause());
exception.printStackTrace(); // need this for driver
} else {
System.out.println("Successfully committed offsets"); // need this for driver
for (OffsetRange offsetRange : offsetRanges) {
System.out.println("Offset Info: paratition {}, fromOffset {} untilOffset {}: "+ offsetRange.partition() +":"+ offsetRange.fromOffset() +":"+ offsetRange.untilOffset());
}
}
});
enable.auto.commit
: 假
观察 while 循环中的 throw new Exception();
。即使批处理由于异常而失败,我也看到提交的偏移量,由于处理失败,我预计这里会有一些滞后,这里有什么问题?
Kafka 上的 Spark 结构化流的美妙之处在于它提供了 Kafka Stream 中不可用的手动偏移量。 Spark 流提交是线程安全的,本质上是异步的,并且由于 Kafka 不是事务性的,因此您的输出必须仍然是幂等的。这意味着当您开始使用消息时,您的偏移量会不断增加,而提交可能会在稍后出现。
与 HasOffsetRanges 一样,只有在调用 createDirectStream 的结果时才会成功转换为 CanCommitOffsets,而不是在转换之后。 commitAsync 调用是线程安全的,但必须在输出之后发生。
您可以使用回调来检查您的提交执行情况,如下所示
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges, new OffsetCommitCallback() {
def onComplete(m: java.util.Map[TopicPartition, OffsetAndMetadata], e: Exception) {
m.foreach(f => {
if (null != e) {
logger.info("Failed to cmomit:" + f._1 + "," + f._2)
logger.info("Error while commitAsync. Retry again"+e.toString)
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
} else {
println("Offset commit:" + f._1 + "," + f._2)
}
})
}
})
如果工作节点出现异常,则重新提交任务的最大值为 spark.task.maxFailures
(放弃作业前任何特定任务的失败次数)。一旦处理了 Dstream 批处理,就会提交偏移量。您必须根据您的用例处理异常(记录错误记录或转发记录到 DLQ)。
我正在尝试验证手动偏移提交的工作情况。
当我尝试通过在 while 循环中使用 thread.sleep()/jssc.stop()/ 抛出异常来退出作业时,我看到正在提交偏移量。
我只是为了测试而发送了几条消息,但是作业开始处理批处理后我发现延迟为 0。
spark 何时实际提交偏移量?
JavaInputDStream<ConsumerRecord<String, String>> kafkaStream = KafkaUtils.createDirectStream(jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
kafkaStream.foreachRDD(kafkaStreamRDD -> {
// fetch kafka offsets for manually committing it later
OffsetRange[] offsetRanges = ((HasOffsetRanges) kafkaStreamRDD.rdd()).offsetRanges();
// filter unwanted data
kafkaStreamRDD.filter(new Function<ConsumerRecord<String, String>, Boolean>() {
//filter logic here
}).foreachPartition(kafkaRecords -> {
//Initializing DB connections
while (kafkaRecords.hasNext()) {
//doing some work here
//-----> EXCEPTION
throw new Exception();
}
});
// commit offsets saveOffsets after processing
((CanCommitOffsets) kafkaStream.inputDStream()).commitAsync(offsetRanges, (offsets, exception) -> {
if (exception != null) {
System.out.println("-------------Unable to commit offsets, something went wrong, trace ------------"+ exception.getCause());
exception.printStackTrace(); // need this for driver
} else {
System.out.println("Successfully committed offsets"); // need this for driver
for (OffsetRange offsetRange : offsetRanges) {
System.out.println("Offset Info: paratition {}, fromOffset {} untilOffset {}: "+ offsetRange.partition() +":"+ offsetRange.fromOffset() +":"+ offsetRange.untilOffset());
}
}
});
enable.auto.commit
: 假
观察 while 循环中的 throw new Exception();
。即使批处理由于异常而失败,我也看到提交的偏移量,由于处理失败,我预计这里会有一些滞后,这里有什么问题?
Kafka 上的 Spark 结构化流的美妙之处在于它提供了 Kafka Stream 中不可用的手动偏移量。 Spark 流提交是线程安全的,本质上是异步的,并且由于 Kafka 不是事务性的,因此您的输出必须仍然是幂等的。这意味着当您开始使用消息时,您的偏移量会不断增加,而提交可能会在稍后出现。 与 HasOffsetRanges 一样,只有在调用 createDirectStream 的结果时才会成功转换为 CanCommitOffsets,而不是在转换之后。 commitAsync 调用是线程安全的,但必须在输出之后发生。
您可以使用回调来检查您的提交执行情况,如下所示
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges, new OffsetCommitCallback() {
def onComplete(m: java.util.Map[TopicPartition, OffsetAndMetadata], e: Exception) {
m.foreach(f => {
if (null != e) {
logger.info("Failed to cmomit:" + f._1 + "," + f._2)
logger.info("Error while commitAsync. Retry again"+e.toString)
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
} else {
println("Offset commit:" + f._1 + "," + f._2)
}
})
}
})
如果工作节点出现异常,则重新提交任务的最大值为 spark.task.maxFailures
(放弃作业前任何特定任务的失败次数)。一旦处理了 Dstream 批处理,就会提交偏移量。您必须根据您的用例处理异常(记录错误记录或转发记录到 DLQ)。