如何在 Spark Streaming 中将 RDD 转换为 DataFrame,而不仅仅是 Spark

How to convert RDD to DataFrame in Spark Streaming, not just Spark

如何在 Spark Streaming 中将 RDD 转换为 DataFrame,而不仅仅是 Spark

我看到了这个例子,但它需要 SparkContext

val sqlContext = new SQLContext(sc) 
import sqlContext.implicits._
rdd.toDF()

就我而言,我有 StreamingContext。然后我应该在 foreach 中创建 SparkContext 吗?看起来太疯狂了……那么,如何处理这个问题呢?我的最终目标(如果它可能有用的话)是使用 rdd.toDF.write.format("json").saveAsTextFile("s3://iiiii/ttttt.json"); 在 Amazon S3 中保存 DataFrame,如果不将其转换为 DataFrame(如我知道)。

myDstream.foreachRDD { rdd =>
    val conf = new SparkConf().setMaster("local").setAppName("My App")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc) 
    import sqlContext.implicits._
    rdd.toDF()
}

foreachRDD之外创建sqlContext,一旦你使用sqlContextrdd转换为DF,你就可以写入S3。

例如:

val conf = new SparkConf().setMaster("local").setAppName("My App")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc) 
import sqlContext.implicits._
myDstream.foreachRDD { rdd =>

    val df = rdd.toDF()
    df.write.format("json").saveAsTextFile("s3://iiiii/ttttt.json")
}

更新:

甚至您可以在 foreachRDD 中创建 sqlContext,它将在 Driver 上执行。

查看以下答案,其中包含 python 笔记本中的 scala 魔法单元: