在 spark-streaming 上下文中将 RDD 写入 HDFS
write an RDD into HDFS in a spark-streaming context
我有一个带有 spark 1.2.0 的 spark 流环境,我从本地文件夹中检索数据,每次我发现一个新文件添加到该文件夹时,我都会执行一些转换。
val ssc = new StreamingContext(sc, Seconds(10))
val data = ssc.textFileStream(directory)
为了对 DStream 数据进行分析,我必须将其转换为数组
var arr = new ArrayBuffer[String]();
data.foreachRDD {
arr ++= _.collect()
}
然后我用获取到的数据提取出我想要的信息,保存在HDFS上。
val myRDD = sc.parallelize(arr)
myRDD.saveAsTextFile("hdfs directory....")
因为我真的需要用数组操作数据,所以不可能用 DStream.saveAsTextFiles("...")
将数据保存在 HDFS 上(这可以正常工作)而且我必须保存 RDD 但是通过这个程序我最终得到了空输出名为 part-00000 等的文件...
通过 arr.foreach(println)
我能够看到正确的转换结果。
我怀疑 spark 会尝试在每个批次中将数据写入相同的文件,删除之前写入的内容。我试图保存在一个动态命名的文件夹中,如 myRDD.saveAsTextFile("folder" + System.currentTimeMillis().toString())
但总是只创建一个文件夹,输出文件仍然是空的。
如何在 spark-streaming 上下文中将 RDD 写入 HDFS?
您正在以非设计的方式使用 Spark Streaming。我建议在您的用例中放弃使用 Spark,或者调整您的代码以使其以 Spark 方式工作。将数组收集到驱动程序违背了使用分布式引擎的目的,并使您的应用程序有效地成为单机(两台机器也会导致比仅在单机上处理数据更多的开销)。
你可以用数组做的一切,你也可以用 Spark 做。因此,只需 运行 您在流中的计算,分布在工作人员上,并使用 DStream.saveAsTextFiles()
编写您的输出。您可以使用 foreachRDD
+ saveAsParquet(path, overwrite = true)
写入单个文件。
@vzamboni:Spark 1.5+ 数据帧 api 具有此功能:
dataframe.write().mode(SaveMode.Append).format(FILE_FORMAT).partitionBy("parameter1", "parameter2").save(path);
我有一个带有 spark 1.2.0 的 spark 流环境,我从本地文件夹中检索数据,每次我发现一个新文件添加到该文件夹时,我都会执行一些转换。
val ssc = new StreamingContext(sc, Seconds(10))
val data = ssc.textFileStream(directory)
为了对 DStream 数据进行分析,我必须将其转换为数组
var arr = new ArrayBuffer[String]();
data.foreachRDD {
arr ++= _.collect()
}
然后我用获取到的数据提取出我想要的信息,保存在HDFS上。
val myRDD = sc.parallelize(arr)
myRDD.saveAsTextFile("hdfs directory....")
因为我真的需要用数组操作数据,所以不可能用 DStream.saveAsTextFiles("...")
将数据保存在 HDFS 上(这可以正常工作)而且我必须保存 RDD 但是通过这个程序我最终得到了空输出名为 part-00000 等的文件...
通过 arr.foreach(println)
我能够看到正确的转换结果。
我怀疑 spark 会尝试在每个批次中将数据写入相同的文件,删除之前写入的内容。我试图保存在一个动态命名的文件夹中,如 myRDD.saveAsTextFile("folder" + System.currentTimeMillis().toString())
但总是只创建一个文件夹,输出文件仍然是空的。
如何在 spark-streaming 上下文中将 RDD 写入 HDFS?
您正在以非设计的方式使用 Spark Streaming。我建议在您的用例中放弃使用 Spark,或者调整您的代码以使其以 Spark 方式工作。将数组收集到驱动程序违背了使用分布式引擎的目的,并使您的应用程序有效地成为单机(两台机器也会导致比仅在单机上处理数据更多的开销)。
你可以用数组做的一切,你也可以用 Spark 做。因此,只需 运行 您在流中的计算,分布在工作人员上,并使用 DStream.saveAsTextFiles()
编写您的输出。您可以使用 foreachRDD
+ saveAsParquet(path, overwrite = true)
写入单个文件。
@vzamboni:Spark 1.5+ 数据帧 api 具有此功能:
dataframe.write().mode(SaveMode.Append).format(FILE_FORMAT).partitionBy("parameter1", "parameter2").save(path);