如何使用 spark streaming 读取文件并使用 Scala 写入简单文件?

How to read a file using sparkstreaming and write to a simple file using Scala?

我正在尝试使用 scala SparkStreaming 程序读取文件。该文件存储在我本地机器上的一个目录中,并试图将它作为一个新文件写入我的本地机器上。但是每当我编写流并将其存储为镶木地板时,我最终都会得到空白文件夹。

这是我的代码:

 Logger.getLogger("org").setLevel(Level.ERROR)
 val spark = SparkSession
             .builder()
             .master("local[*]")
             .appName("StreamAFile")
             .config("spark.sql.warehouse.dir", "file:///C:/temp")
             .getOrCreate()
 
         
 import spark.implicits._            
 val schemaforfile = new StructType().add("SrNo",IntegerType).add("Name",StringType).add("Age",IntegerType).add("Friends",IntegerType)
             
 val file = spark.readStream.schema(schemaforfile).csv("C:\SparkScala\fakefriends.csv")  

 file.writeStream.format("parquet").start("C:\Users\roswal01\Desktop\streamed") 
 
 spark.stop()
 

我的代码中是否有任何遗漏或代码中哪里出错了?

我也尝试从 hdfs 位置读取此文件,但相同的代码最终没有在我的 hdfs 上创建任何输出文件夹。

你错了:

val file = spark.readStream.schema(schemaforfile).csv("C:\SparkScala\fakefriends.csv")  

csv() 函数应将目录路径作为参数。它将扫描这个目录并在它们被移动到这个目录时读取所有新文件

对于检查点,您应该添加

.option("checkpointLocation", "path/to/HDFS/dir")

例如:

val query = file.writeStream.format("parquet")
    .option("checkpointLocation", "path/to/HDFS/dir")
    .start("C:\Users\roswal01\Desktop\streamed") 

query.awaitTermination()