在 spark-streaming 上下文中将 RDD 写入 HDFS

write an RDD into HDFS in a spark-streaming context

我有一个带有 spark 1.2.0 的 spark 流环境,我从本地文件夹中检索数据,每次我发现一个新文件添加到该文件夹​​时,我都会执行一些转换。

val ssc = new StreamingContext(sc, Seconds(10))
val data = ssc.textFileStream(directory)

为了对 DStream 数据进行分析,我必须将其转换为数组

var arr = new ArrayBuffer[String]();
   data.foreachRDD {
   arr ++= _.collect()
}

然后我用获取到的数据提取出我想要的信息,保存在HDFS上。

val myRDD  = sc.parallelize(arr)
myRDD.saveAsTextFile("hdfs directory....")

因为我真的需要用数组操作数据,所以不可能用 DStream.saveAsTextFiles("...") 将数据保存在 HDFS 上(这可以正常工作)而且我必须保存 RDD 但是通过这个程序我最终得到了空输出名为 part-00000 等的文件...

通过 arr.foreach(println) 我能够看到正确的转换结果。

我怀疑 spark 会尝试在每个批次中将数据写入相同的文件,删除之前写入的内容。我试图保存在一个动态命名的文件夹中,如 myRDD.saveAsTextFile("folder" + System.currentTimeMillis().toString()) 但总是只创建一个文件夹,输出文件仍然是空的。

如何在 spark-streaming 上下文中将 RDD 写入 HDFS?

您正在以非设计的方式使用 Spark Streaming。我建议在您的用例中放弃使用 Spark,或者调整您的代码以使其以 Spark 方式工作。将数组收集到驱动程序违背了使用分布式引擎的目的,并使您的应用程序有效地成为单机(两台机器也会导致比仅在单机上处理数据更多的开销)。

你可以用数组做的一切,你也可以用 Spark 做。因此,只需 运行 您在流中的计算,分布在工作人员上,并使用 DStream.saveAsTextFiles() 编写您的输出。您可以使用 foreachRDD + saveAsParquet(path, overwrite = true) 写入单个文件。

@vzamboni:Spark 1.5+ 数据帧 api 具有此功能:

dataframe.write().mode(SaveMode.Append).format(FILE_FORMAT).partitionBy("parameter1", "parameter2").save(path);