带有广播连接的 Spark 流式传输
Spark streaming with broadcast joins
我有一个火花流用例,我计划在每个执行器上广播和缓存一个数据集。流中的每个微批次都会从 RDD 中创建一个数据帧并加入批次。下面给出的测试代码将为每批执行广播操作。有没有办法只播放一次?
val testDF = sqlContext.read.format("com.databricks.spark.csv")
.schema(schema).load("file:///shared/data/test-data.txt")
val lines = ssc.socketTextStream("DevNode", 9999)
lines.foreachRDD((rdd, timestamp) => {
val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt, l(1))).toDF()
val resultDF = recordDF.join(broadcast(testDF), "Age")
resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
}
对于每批读取此文件并执行广播。
16/02/18 12:24:02 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:27+28
16/02/18 12:24:02 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:0+27
16/02/18 12:25:00 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:27+28
16/02/18 12:25:00 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:0+27
有关于仅一次广播数据集的建议吗?
目前广播表似乎没有被重用。参见:SPARK-3863
在 foreachRDD
循环外执行广播:
val testDF = broadcast(sqlContext.read.format("com.databricks.spark.csv")
.schema(schema).load(...))
lines.foreachRDD((rdd, timestamp) => {
val recordDF = ???
val resultDF = recordDF.join(testDF, "Age")
resultDF.write.format("com.databricks.spark.csv").save(...)
}
我有一个火花流用例,我计划在每个执行器上广播和缓存一个数据集。流中的每个微批次都会从 RDD 中创建一个数据帧并加入批次。下面给出的测试代码将为每批执行广播操作。有没有办法只播放一次?
val testDF = sqlContext.read.format("com.databricks.spark.csv")
.schema(schema).load("file:///shared/data/test-data.txt")
val lines = ssc.socketTextStream("DevNode", 9999)
lines.foreachRDD((rdd, timestamp) => {
val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt, l(1))).toDF()
val resultDF = recordDF.join(broadcast(testDF), "Age")
resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
}
对于每批读取此文件并执行广播。
16/02/18 12:24:02 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:27+28
16/02/18 12:24:02 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:0+27
16/02/18 12:25:00 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:27+28
16/02/18 12:25:00 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:0+27
有关于仅一次广播数据集的建议吗?
目前广播表似乎没有被重用。参见:SPARK-3863
在 foreachRDD
循环外执行广播:
val testDF = broadcast(sqlContext.read.format("com.databricks.spark.csv")
.schema(schema).load(...))
lines.foreachRDD((rdd, timestamp) => {
val recordDF = ???
val resultDF = recordDF.join(testDF, "Age")
resultDF.write.format("com.databricks.spark.csv").save(...)
}