如何在 Spark Streaming 中将 RDD 转换为 DataFrame,而不仅仅是 Spark
How to convert RDD to DataFrame in Spark Streaming, not just Spark
如何在 Spark Streaming
中将 RDD
转换为 DataFrame
,而不仅仅是 Spark
?
我看到了这个例子,但它需要 SparkContext
。
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
rdd.toDF()
就我而言,我有 StreamingContext
。然后我应该在 foreach
中创建 SparkContext
吗?看起来太疯狂了……那么,如何处理这个问题呢?我的最终目标(如果它可能有用的话)是使用 rdd.toDF.write.format("json").saveAsTextFile("s3://iiiii/ttttt.json");
在 Amazon S3 中保存 DataFrame
,如果不将其转换为 DataFrame
(如我知道)。
myDstream.foreachRDD { rdd =>
val conf = new SparkConf().setMaster("local").setAppName("My App")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
rdd.toDF()
}
在foreachRDD
之外创建sqlContext
,一旦你使用sqlContext
将rdd
转换为DF,你就可以写入S3。
例如:
val conf = new SparkConf().setMaster("local").setAppName("My App")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
myDstream.foreachRDD { rdd =>
val df = rdd.toDF()
df.write.format("json").saveAsTextFile("s3://iiiii/ttttt.json")
}
更新:
甚至您可以在 foreachRDD
中创建 sqlContext
,它将在 Driver 上执行。
查看以下答案,其中包含 python 笔记本中的 scala 魔法单元:
如何在 Spark Streaming
中将 RDD
转换为 DataFrame
,而不仅仅是 Spark
?
我看到了这个例子,但它需要 SparkContext
。
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
rdd.toDF()
就我而言,我有 StreamingContext
。然后我应该在 foreach
中创建 SparkContext
吗?看起来太疯狂了……那么,如何处理这个问题呢?我的最终目标(如果它可能有用的话)是使用 rdd.toDF.write.format("json").saveAsTextFile("s3://iiiii/ttttt.json");
在 Amazon S3 中保存 DataFrame
,如果不将其转换为 DataFrame
(如我知道)。
myDstream.foreachRDD { rdd =>
val conf = new SparkConf().setMaster("local").setAppName("My App")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
rdd.toDF()
}
在foreachRDD
之外创建sqlContext
,一旦你使用sqlContext
将rdd
转换为DF,你就可以写入S3。
例如:
val conf = new SparkConf().setMaster("local").setAppName("My App")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
myDstream.foreachRDD { rdd =>
val df = rdd.toDF()
df.write.format("json").saveAsTextFile("s3://iiiii/ttttt.json")
}
更新:
甚至您可以在 foreachRDD
中创建 sqlContext
,它将在 Driver 上执行。
查看以下答案,其中包含 python 笔记本中的 scala 魔法单元: