带有广播连接的 Spark 流式传输

Spark streaming with broadcast joins

我有一个火花流用例,我计划在每个执行器上广播和缓存一个数据集。流中的每个微批次都会从 RDD 中创建一个数据帧并加入批次。下面给出的测试代码将为每批执行广播操作。有没有办法只播放一次?

val testDF = sqlContext.read.format("com.databricks.spark.csv")
                .schema(schema).load("file:///shared/data/test-data.txt") 

val lines = ssc.socketTextStream("DevNode", 9999)

lines.foreachRDD((rdd, timestamp) => {
    val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt, l(1))).toDF()
    val resultDF = recordDF.join(broadcast(testDF), "Age")
    resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
    }

对于每批读取此文件并执行广播。

16/02/18 12:24:02 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:27+28
16/02/18 12:24:02 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:0+27

16/02/18 12:25:00 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:27+28
16/02/18 12:25:00 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:0+27

有关于仅一次广播数据集的建议吗?

目前广播表似乎没有被重用。参见:SPARK-3863

foreachRDD 循环外执行广播:

val testDF = broadcast(sqlContext.read.format("com.databricks.spark.csv")
 .schema(schema).load(...))

lines.foreachRDD((rdd, timestamp) => { 
  val recordDF = ???
  val resultDF = recordDF.join(testDF, "Age")
  resultDF.write.format("com.databricks.spark.csv").save(...)
}