如何将 Spark Streaming 应用程序的输出写入单个文件
How to write outputs of spark streaming application to a single file
我正在使用 spark 流从 Kafka 读取数据并传递到 py 文件进行预测。它 returns 预测以及原始数据。它会将原始数据及其预测保存到文件中,但它会为每个 RDD 创建一个文件。
我需要一个包含所有收集的数据的文件,直到我停止要保存到单个文件的程序。
我试过 writeStream 它甚至不创建一个文件。
我试图使用 append 将它保存到镶木地板,但它创建了多个文件,每个 RDD 为 1 个。
我尝试使用追加模式编写仍然有多个文件作为输出。
下面的代码创建一个文件夹 output.csv 并将所有文件输入其中。
def main(args: Array[String]): Unit = {
val ss = SparkSession.builder()
.appName("consumer")
.master("local[*]")
.getOrCreate()
val scc = new StreamingContext(ss.sparkContext, Seconds(2))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer"->
"org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer">
"org.apache.kafka.common.serialization.StringDeserializer",
"group.id"-> "group5" // clients can take
)
mappedData.foreachRDD(
x =>
x.map(y =>
ss.sparkContext.makeRDD(List(y)).pipe(pyPath).toDF().repartition(1)
.write.format("csv").mode("append").option("truncate","false")
.save("output.csv")
)
)
scc.start()
scc.awaitTermination()
我只需要获取 1 个文件,其中包含在流式传输时一一收集的所有语句。
任何帮助将不胜感激,谢谢你的期待。
hdfs 中的任何文件一旦写入就无法修改。如果您希望实时写入文件(每 2 秒将来自流作业的数据块附加到同一个文件中),则根本不允许这样做,因为 hdfs 文件是 immutable。如果可能的话,我建议你尝试编写一个从多个文件读取的读取逻辑。
但是,如果您必须从单个文件读取,我建议使用两种方法之一,在将输出写入单个 csv/parquet 文件夹后,使用 "Append" SaveMode(这将为每 2 秒写入的每个块创建零件文件)。
- 您可以在此文件夹之上创建配置单元 table,从该文件夹读取数据 table。
您可以在 spark 中编写一个简单的逻辑来读取包含多个文件的文件夹,然后使用 reparation(1) 或 coalesce(1) 将其作为单个文件写入另一个 hdfs 位置,然后读取来自该位置的数据。见下文:
spark.read.csv("oldLocation").coalesce(1).write.csv("newLocation")
重新分区 - 建议在增加分区数的同时使用重新分区,因为它涉及所有数据的混洗。
coalesce- 建议在减少分区数的同时使用 coalesce。例如,如果您有 3 个分区并且您想将其减少为 2 个分区,Coalesce 会将第 3 个分区数据移动到分区 1 和 2。分区 1 和 2 将保持不变 Container.but 重新分区将在所有分区中随机播放数据,因此执行器之间的网络使用率会很高,这会影响性能。
性能明智的合并性能优于重新分区,同时减少分区数量。
所以在编写时使用选项作为合并。
例如:df.write.coalesce
我正在使用 spark 流从 Kafka 读取数据并传递到 py 文件进行预测。它 returns 预测以及原始数据。它会将原始数据及其预测保存到文件中,但它会为每个 RDD 创建一个文件。 我需要一个包含所有收集的数据的文件,直到我停止要保存到单个文件的程序。
我试过 writeStream 它甚至不创建一个文件。 我试图使用 append 将它保存到镶木地板,但它创建了多个文件,每个 RDD 为 1 个。 我尝试使用追加模式编写仍然有多个文件作为输出。 下面的代码创建一个文件夹 output.csv 并将所有文件输入其中。
def main(args: Array[String]): Unit = {
val ss = SparkSession.builder()
.appName("consumer")
.master("local[*]")
.getOrCreate()
val scc = new StreamingContext(ss.sparkContext, Seconds(2))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer"->
"org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer">
"org.apache.kafka.common.serialization.StringDeserializer",
"group.id"-> "group5" // clients can take
)
mappedData.foreachRDD(
x =>
x.map(y =>
ss.sparkContext.makeRDD(List(y)).pipe(pyPath).toDF().repartition(1)
.write.format("csv").mode("append").option("truncate","false")
.save("output.csv")
)
)
scc.start()
scc.awaitTermination()
我只需要获取 1 个文件,其中包含在流式传输时一一收集的所有语句。
任何帮助将不胜感激,谢谢你的期待。
hdfs 中的任何文件一旦写入就无法修改。如果您希望实时写入文件(每 2 秒将来自流作业的数据块附加到同一个文件中),则根本不允许这样做,因为 hdfs 文件是 immutable。如果可能的话,我建议你尝试编写一个从多个文件读取的读取逻辑。
但是,如果您必须从单个文件读取,我建议使用两种方法之一,在将输出写入单个 csv/parquet 文件夹后,使用 "Append" SaveMode(这将为每 2 秒写入的每个块创建零件文件)。
- 您可以在此文件夹之上创建配置单元 table,从该文件夹读取数据 table。
您可以在 spark 中编写一个简单的逻辑来读取包含多个文件的文件夹,然后使用 reparation(1) 或 coalesce(1) 将其作为单个文件写入另一个 hdfs 位置,然后读取来自该位置的数据。见下文:
spark.read.csv("oldLocation").coalesce(1).write.csv("newLocation")
重新分区 - 建议在增加分区数的同时使用重新分区,因为它涉及所有数据的混洗。
coalesce- 建议在减少分区数的同时使用 coalesce。例如,如果您有 3 个分区并且您想将其减少为 2 个分区,Coalesce 会将第 3 个分区数据移动到分区 1 和 2。分区 1 和 2 将保持不变 Container.but 重新分区将在所有分区中随机播放数据,因此执行器之间的网络使用率会很高,这会影响性能。
性能明智的合并性能优于重新分区,同时减少分区数量。
所以在编写时使用选项作为合并。 例如:df.write.coalesce