Spark Streaming:将 Dstream 批处理加入单个输出文件夹
Spark Streaming : Join Dstream batches into single output Folder
我正在使用 Spark Streaming 通过创建 StreamingContext 从 Twitter 获取推文:
val ssc = new StreamingContext("local[3]", "TwitterFeed",Minutes(1))
并将 Twitter 流创建为:
val tweetStream = TwitterUtils.createStream(ssc, Some(new OAuthAuthorization(Util.config)),filters)
然后将其保存为文本文件
tweets.repartition(1).saveAsTextFiles("/tmp/spark_testing/")
问题是推文是根据批次时间保存为文件夹,但我需要将每批次的所有数据都放在同一个文件夹中。
是否有解决方法?
谢谢
我们可以使用 Spark SQL 的新 DataFrame 保存 API 来做到这一点,它允许附加到现有输出。默认情况下,saveAsTextFile 将无法保存到包含现有数据的目录(请参阅 https://spark.apache.org/docs/latest/sql-programming-guide.html#save-modes ). https://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations 介绍如何设置 Spark SQL 上下文以与 Spark Streaming 一起使用。
假设您使用 SQLContextSingleton 从指南中复制部分,生成的代码如下所示:
data.foreachRDD{rdd =>
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
// Convert your data to a DataFrame, depends on the structure of your data
val df = ....
df.save("org.apache.spark.sql.json", SaveMode.Append, Map("path" -> path.toString))
}
(请注意,上面的示例使用 JSON 来保存结果,但您也可以使用不同的输出格式)。
我正在使用 Spark Streaming 通过创建 StreamingContext 从 Twitter 获取推文:
val ssc = new StreamingContext("local[3]", "TwitterFeed",Minutes(1))
并将 Twitter 流创建为:
val tweetStream = TwitterUtils.createStream(ssc, Some(new OAuthAuthorization(Util.config)),filters)
然后将其保存为文本文件
tweets.repartition(1).saveAsTextFiles("/tmp/spark_testing/")
问题是推文是根据批次时间保存为文件夹,但我需要将每批次的所有数据都放在同一个文件夹中。
是否有解决方法?
谢谢
我们可以使用 Spark SQL 的新 DataFrame 保存 API 来做到这一点,它允许附加到现有输出。默认情况下,saveAsTextFile 将无法保存到包含现有数据的目录(请参阅 https://spark.apache.org/docs/latest/sql-programming-guide.html#save-modes ). https://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations 介绍如何设置 Spark SQL 上下文以与 Spark Streaming 一起使用。
假设您使用 SQLContextSingleton 从指南中复制部分,生成的代码如下所示:
data.foreachRDD{rdd =>
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
// Convert your data to a DataFrame, depends on the structure of your data
val df = ....
df.save("org.apache.spark.sql.json", SaveMode.Append, Map("path" -> path.toString))
}
(请注意,上面的示例使用 JSON 来保存结果,但您也可以使用不同的输出格式)。