如何使用 spark streaming 读取文件并使用 Scala 写入简单文件?
How to read a file using sparkstreaming and write to a simple file using Scala?
我正在尝试使用 scala SparkStreaming 程序读取文件。该文件存储在我本地机器上的一个目录中,并试图将它作为一个新文件写入我的本地机器上。但是每当我编写流并将其存储为镶木地板时,我最终都会得到空白文件夹。
这是我的代码:
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession
.builder()
.master("local[*]")
.appName("StreamAFile")
.config("spark.sql.warehouse.dir", "file:///C:/temp")
.getOrCreate()
import spark.implicits._
val schemaforfile = new StructType().add("SrNo",IntegerType).add("Name",StringType).add("Age",IntegerType).add("Friends",IntegerType)
val file = spark.readStream.schema(schemaforfile).csv("C:\SparkScala\fakefriends.csv")
file.writeStream.format("parquet").start("C:\Users\roswal01\Desktop\streamed")
spark.stop()
我的代码中是否有任何遗漏或代码中哪里出错了?
我也尝试从 hdfs 位置读取此文件,但相同的代码最终没有在我的 hdfs 上创建任何输出文件夹。
你错了:
val file = spark.readStream.schema(schemaforfile).csv("C:\SparkScala\fakefriends.csv")
csv() 函数应将目录路径作为参数。它将扫描这个目录并在它们被移动到这个目录时读取所有新文件
对于检查点,您应该添加
.option("checkpointLocation", "path/to/HDFS/dir")
例如:
val query = file.writeStream.format("parquet")
.option("checkpointLocation", "path/to/HDFS/dir")
.start("C:\Users\roswal01\Desktop\streamed")
query.awaitTermination()
我正在尝试使用 scala SparkStreaming 程序读取文件。该文件存储在我本地机器上的一个目录中,并试图将它作为一个新文件写入我的本地机器上。但是每当我编写流并将其存储为镶木地板时,我最终都会得到空白文件夹。
这是我的代码:
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession
.builder()
.master("local[*]")
.appName("StreamAFile")
.config("spark.sql.warehouse.dir", "file:///C:/temp")
.getOrCreate()
import spark.implicits._
val schemaforfile = new StructType().add("SrNo",IntegerType).add("Name",StringType).add("Age",IntegerType).add("Friends",IntegerType)
val file = spark.readStream.schema(schemaforfile).csv("C:\SparkScala\fakefriends.csv")
file.writeStream.format("parquet").start("C:\Users\roswal01\Desktop\streamed")
spark.stop()
我的代码中是否有任何遗漏或代码中哪里出错了?
我也尝试从 hdfs 位置读取此文件,但相同的代码最终没有在我的 hdfs 上创建任何输出文件夹。
你错了:
val file = spark.readStream.schema(schemaforfile).csv("C:\SparkScala\fakefriends.csv")
csv() 函数应将目录路径作为参数。它将扫描这个目录并在它们被移动到这个目录时读取所有新文件
对于检查点,您应该添加
.option("checkpointLocation", "path/to/HDFS/dir")
例如:
val query = file.writeStream.format("parquet")
.option("checkpointLocation", "path/to/HDFS/dir")
.start("C:\Users\roswal01\Desktop\streamed")
query.awaitTermination()