使用 checkpointLocation 偏移量从 Kafka 主题读取流的正确方法
Right way to read stream from Kafka topic using checkpointLocation offsets
我正在尝试开发一个小型 Spark 应用程序(使用 Scala)以从 Kafka (Confluent) 读取消息并将它们写入(插入)到 Hive table。一切都按预期工作,除了一项重要功能 - 在应用程序重新启动(提交)时管理偏移量。这让我很困惑。
从我的代码中删除:
def main(args: Array[String]): Unit = {
val sparkSess = SparkSession
.builder
.appName("Kafka_to_Hive")
.config("spark.sql.warehouse.dir", "/user/hive/warehouse/")
.config("hive.metastore.uris", "thrift://localhost:9083")
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.enableHiveSupport()
.getOrCreate()
sparkSess.sparkContext.setLogLevel("ERROR")
// don't consider this code block please, it's just a part of Confluent avro message deserializing adventures
sparkSess.udf.register("deserialize", (bytes: Array[Byte]) =>
DeserializerWrapper.deserializer.deserialize(bytes)
)
val kafkaDataFrame = sparkSess
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", 'localhost:9092')
.option("group.id", 'kafka-to-hive-1')
// ------> which Kafka options do I need to set here for starting from last right offset to ensure completenes of data and "exactly once" writing? <--------
.option("failOnDataLoss", (false: java.lang.Boolean))
.option("subscribe", 'some_topic')
.load()
import org.apache.spark.sql.functions._
// don't consider this code block please, it's just a part of Confluent avro message deserializing adventures
val valueDataFrame = kafkaDataFrame.selectExpr("""deserialize(value) AS message""")
val df = valueDataFrame.select(
from_json(col("message"), sparkSchema.dataType).alias("parsed_value"))
.select("parsed_value.*")
df.writeStream
.foreachBatch((batchDataFrame, batchId) => {
batchDataFrame.createOrReplaceTempView("`some_view_name`")
val sqlText = "SELECT * FROM `some_view_name` a where some_field='some value'"
val batchDataFrame_view = batchDataFrame.sparkSession.sql(sqlText);
batchDataFrame_view.write.insertInto("default.some_hive_table")
})
.option("checkpointLocation", "/user/some_user/tmp/checkpointLocation")
.start()
.awaitTermination()
}
问题(问题相互关联):
- 我需要在
readStream.format("kafka")
上应用哪些 Kafka 选项才能在每次提交 spark 应用程序时从最后一个正确的偏移量开始?
- 我是否需要手动读取 checkpointLocation/offsets/latest_batch 文件的第 3 行以找到要从 Kafka 读取的最后偏移量?我的意思是这样的:
readStream.format("kafka").option("startingOffsets", """{"some_topic":{"2":35079,"5":34854,"4":35537,"1":35357,"3":35436,"0":35213}}""")
- 从 Kafka (Confluent) 主题读取流的 right/convenient 方法是什么? (我不考虑 Kafka 的偏移量存储引擎)
"Which Kafka options do I need to apply on readStream.format("kafka") for starting from last right offset on every submit of spark app?"
您需要设置 startingOffsets=latest
和 清理检查点文件。
"Do I need to manually read 3rd line of checkpointLocation/offsets/latest_batch file to find last offsets to read from Kafka? I mean something like that: readStream.format("kafka").option("startingOffsets", """{"some_topic":{"2":35079,"5":34854,"4":35537,"1":35357,"3":35436,"0":35213}}""")"
与第一个问题类似,如果将startingOffsets设置为json字符串,则需要删除检查点文件。否则,spark 应用程序将始终获取存储在检查点文件中的信息并覆盖 startingOffsets
选项中给出的设置。
"What is the right/convenient way to read stream from Kafka (Confluent) topic? (I'm not considering offsets storing engine of Kafka)"
询问“正确的方法”可能会导致基于意见的答案,因此 off-topic 在 Whosebug 上。无论如何,根据我的经验,使用 Spark Structured Streaming 已经是一种成熟且 production-ready 的方法。但是,始终值得研究 KafkaConnect。
我正在尝试开发一个小型 Spark 应用程序(使用 Scala)以从 Kafka (Confluent) 读取消息并将它们写入(插入)到 Hive table。一切都按预期工作,除了一项重要功能 - 在应用程序重新启动(提交)时管理偏移量。这让我很困惑。
从我的代码中删除:
def main(args: Array[String]): Unit = {
val sparkSess = SparkSession
.builder
.appName("Kafka_to_Hive")
.config("spark.sql.warehouse.dir", "/user/hive/warehouse/")
.config("hive.metastore.uris", "thrift://localhost:9083")
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.enableHiveSupport()
.getOrCreate()
sparkSess.sparkContext.setLogLevel("ERROR")
// don't consider this code block please, it's just a part of Confluent avro message deserializing adventures
sparkSess.udf.register("deserialize", (bytes: Array[Byte]) =>
DeserializerWrapper.deserializer.deserialize(bytes)
)
val kafkaDataFrame = sparkSess
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", 'localhost:9092')
.option("group.id", 'kafka-to-hive-1')
// ------> which Kafka options do I need to set here for starting from last right offset to ensure completenes of data and "exactly once" writing? <--------
.option("failOnDataLoss", (false: java.lang.Boolean))
.option("subscribe", 'some_topic')
.load()
import org.apache.spark.sql.functions._
// don't consider this code block please, it's just a part of Confluent avro message deserializing adventures
val valueDataFrame = kafkaDataFrame.selectExpr("""deserialize(value) AS message""")
val df = valueDataFrame.select(
from_json(col("message"), sparkSchema.dataType).alias("parsed_value"))
.select("parsed_value.*")
df.writeStream
.foreachBatch((batchDataFrame, batchId) => {
batchDataFrame.createOrReplaceTempView("`some_view_name`")
val sqlText = "SELECT * FROM `some_view_name` a where some_field='some value'"
val batchDataFrame_view = batchDataFrame.sparkSession.sql(sqlText);
batchDataFrame_view.write.insertInto("default.some_hive_table")
})
.option("checkpointLocation", "/user/some_user/tmp/checkpointLocation")
.start()
.awaitTermination()
}
问题(问题相互关联):
- 我需要在
readStream.format("kafka")
上应用哪些 Kafka 选项才能在每次提交 spark 应用程序时从最后一个正确的偏移量开始? - 我是否需要手动读取 checkpointLocation/offsets/latest_batch 文件的第 3 行以找到要从 Kafka 读取的最后偏移量?我的意思是这样的:
readStream.format("kafka").option("startingOffsets", """{"some_topic":{"2":35079,"5":34854,"4":35537,"1":35357,"3":35436,"0":35213}}""")
- 从 Kafka (Confluent) 主题读取流的 right/convenient 方法是什么? (我不考虑 Kafka 的偏移量存储引擎)
"Which Kafka options do I need to apply on readStream.format("kafka") for starting from last right offset on every submit of spark app?"
您需要设置 startingOffsets=latest
和 清理检查点文件。
"Do I need to manually read 3rd line of checkpointLocation/offsets/latest_batch file to find last offsets to read from Kafka? I mean something like that: readStream.format("kafka").option("startingOffsets", """{"some_topic":{"2":35079,"5":34854,"4":35537,"1":35357,"3":35436,"0":35213}}""")"
与第一个问题类似,如果将startingOffsets设置为json字符串,则需要删除检查点文件。否则,spark 应用程序将始终获取存储在检查点文件中的信息并覆盖 startingOffsets
选项中给出的设置。
"What is the right/convenient way to read stream from Kafka (Confluent) topic? (I'm not considering offsets storing engine of Kafka)"
询问“正确的方法”可能会导致基于意见的答案,因此 off-topic 在 Whosebug 上。无论如何,根据我的经验,使用 Spark Structured Streaming 已经是一种成熟且 production-ready 的方法。但是,始终值得研究 KafkaConnect。