从 Kafka 主题读取文件路径,然后读取文件并写入 Structured Streaming 中的 DeltaLake
Read file path from Kafka topic and then read file and write to DeltaLake in Structured Streaming
我有一个用例,其中存储在 s3 中的 json 记录的文件路径作为 kafka 出现
卡夫卡中的消息。我必须使用 spark 结构化流处理数据。
我想到的设计如下:
- 在kafka Spark结构化流中,读取包含数据路径的消息。
- 收集驱动中的消息记录。 (消息尺寸较小)
- 从数据位置创建数据框。
kafkaDf.select($"value".cast(StringType))
.writeStream.foreachBatch((batchDf:DataFrame, batchId:Long) => {
//rough code
//collect to driver
val records = batchDf.collect()
//create dataframe and process
records foreach((rec: Row) =>{
println("records:######################", rec.toString())
val path = rec.getAs[String]("data_path")
val dfToProcess = spark.read.json(path)
....
})
}
想知道大家的看法,这种做法好不好?特别是在调用 collect 后创建 Dataframe 时是否存在问题。
如果有更好的方法,请告诉我。
你的想法很完美。
实际上,必须将您的 Dataframe 收集到驱动程序。否则,您无法通过在每个执行器上调用 SparkSession 来创建分布式数据集。没有 collect
你最终会遇到 NullPointerException。
我稍微重写了你的代码框架,还实现了关于如何将你的 Dataframe 写入增量的部分 table(基于你的其他 question)。此外,我使用 Dataset[String]
而不是 Dataframe[Row]
,这让生活更轻松。
将 Spark 3.0.1 与 delta-core 0.7.0 一起使用效果很好。例如,我的测试文件看起来像
{"a":"foo1","b":"bar1"}
{"a":"foo2","b":"bar2"}
我将该文件的位置发送到名为“test”的 Kafka 主题,并应用以下代码来解析该文件并将其列(基于给定模式)写入增量 table 使用代码如下:
val spark = SparkSession.builder()
.appName("KafkaConsumer")
.master("local[*]")
.getOrCreate()
val jsonSchema = new StructType()
.add("a", StringType)
.add("b", StringType)
val deltaPath = "file:///tmp/spark/delta/test"
import spark.implicits._
val kafkaDf = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.load()
.selectExpr("CAST(value AS STRING) as data_path")
.as[String]
kafkaDf.writeStream.foreachBatch((batchDf:Dataset[String], batchId:Long) => {
// collect to driver
val records = batchDf.collect()
// create dataframe based on file location and process and write to Delta-Lake
records.foreach((path: String) => {
val dfToProcess = spark.read.schema(jsonSchema).json(path)
dfToProcess.show(false) // replace this line with your custom processing logic
dfToProcess.write.format("delta").save(deltaPath)
})
}).start()
spark.streams.awaitAnyTermination()
show
调用的输出符合预期:
+----+----+
|a |b |
+----+----+
|foo1|bar1|
|foo2|bar2|
+----+----+
并且数据已作为增量 table 写入通过 deltaPath
指定的位置
/tmp/spark/delta/test$ ll
total 20
drwxrwxr-x 3 x x 4096 Jan 20 13:37 ./
drwxrwxr-x 3 x x 4096 Jan 20 13:37 ../
drwxrwxr-x 2 x x 4096 Jan 20 13:37 _delta_log/
-rw-r--r-- 1 x x 595 Jan 20 13:37 part-00000-b6a540ec-7e63-4d68-a09a-405142479cc1-c000.snappy.parquet
-rw-r--r-- 1 x x 16 Jan 20 13:37 .part-00000-b6a540ec-7e63-4d68-a09a-405142479cc1-c000.snappy.parquet.crc
我有一个用例,其中存储在 s3 中的 json 记录的文件路径作为 kafka 出现 卡夫卡中的消息。我必须使用 spark 结构化流处理数据。
我想到的设计如下:
- 在kafka Spark结构化流中,读取包含数据路径的消息。
- 收集驱动中的消息记录。 (消息尺寸较小)
- 从数据位置创建数据框。
kafkaDf.select($"value".cast(StringType))
.writeStream.foreachBatch((batchDf:DataFrame, batchId:Long) => {
//rough code
//collect to driver
val records = batchDf.collect()
//create dataframe and process
records foreach((rec: Row) =>{
println("records:######################", rec.toString())
val path = rec.getAs[String]("data_path")
val dfToProcess = spark.read.json(path)
....
})
}
想知道大家的看法,这种做法好不好?特别是在调用 collect 后创建 Dataframe 时是否存在问题。 如果有更好的方法,请告诉我。
你的想法很完美。
实际上,必须将您的 Dataframe 收集到驱动程序。否则,您无法通过在每个执行器上调用 SparkSession 来创建分布式数据集。没有 collect
你最终会遇到 NullPointerException。
我稍微重写了你的代码框架,还实现了关于如何将你的 Dataframe 写入增量的部分 table(基于你的其他 question)。此外,我使用 Dataset[String]
而不是 Dataframe[Row]
,这让生活更轻松。
将 Spark 3.0.1 与 delta-core 0.7.0 一起使用效果很好。例如,我的测试文件看起来像
{"a":"foo1","b":"bar1"}
{"a":"foo2","b":"bar2"}
我将该文件的位置发送到名为“test”的 Kafka 主题,并应用以下代码来解析该文件并将其列(基于给定模式)写入增量 table 使用代码如下:
val spark = SparkSession.builder()
.appName("KafkaConsumer")
.master("local[*]")
.getOrCreate()
val jsonSchema = new StructType()
.add("a", StringType)
.add("b", StringType)
val deltaPath = "file:///tmp/spark/delta/test"
import spark.implicits._
val kafkaDf = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.load()
.selectExpr("CAST(value AS STRING) as data_path")
.as[String]
kafkaDf.writeStream.foreachBatch((batchDf:Dataset[String], batchId:Long) => {
// collect to driver
val records = batchDf.collect()
// create dataframe based on file location and process and write to Delta-Lake
records.foreach((path: String) => {
val dfToProcess = spark.read.schema(jsonSchema).json(path)
dfToProcess.show(false) // replace this line with your custom processing logic
dfToProcess.write.format("delta").save(deltaPath)
})
}).start()
spark.streams.awaitAnyTermination()
show
调用的输出符合预期:
+----+----+
|a |b |
+----+----+
|foo1|bar1|
|foo2|bar2|
+----+----+
并且数据已作为增量 table 写入通过 deltaPath
/tmp/spark/delta/test$ ll
total 20
drwxrwxr-x 3 x x 4096 Jan 20 13:37 ./
drwxrwxr-x 3 x x 4096 Jan 20 13:37 ../
drwxrwxr-x 2 x x 4096 Jan 20 13:37 _delta_log/
-rw-r--r-- 1 x x 595 Jan 20 13:37 part-00000-b6a540ec-7e63-4d68-a09a-405142479cc1-c000.snappy.parquet
-rw-r--r-- 1 x x 16 Jan 20 13:37 .part-00000-b6a540ec-7e63-4d68-a09a-405142479cc1-c000.snappy.parquet.crc