使用 scala 检查 csv 文件流上的文件
Check file on csv file streaming with scala
我正在使用 Spark Streaming,不想在新流文件每 10 分钟出现一次时处理旧文件:
val val1= spark
.read //
.option("header", "true")
.option("schema", "true")
.option("sep", ",")
.csv(path_to_file).toDF().cache()
val1.registerTempTable("test")
创建数据框后我做了一些转换和处理
检查点可以帮助我以及我在我的案例中的使用方式
*****************解决方案******************
val spark = SparkSession
.builder
.appName("test")
.config("spark.local", "local[*]")
.getOrCreate()
spark.sparkContext.setCheckpointDir(path_checkpoint)
在我调用数据帧上的检查点函数之后
我指定了一个触发器来执行作业
.writeStream
.format("csv")
.option("codec", "org.apache.hadoop.io.compress.GzipCodec")
.option("checkpointLocation",CheckPoint)
.trigger(Trigger.ProcessingTime("180 seconds"))
.option("Path",Path )
.option("header", true)
.outputMode("Append")
.queryName("test")
.start()
我正在使用 Spark Streaming,不想在新流文件每 10 分钟出现一次时处理旧文件:
val val1= spark
.read //
.option("header", "true")
.option("schema", "true")
.option("sep", ",")
.csv(path_to_file).toDF().cache()
val1.registerTempTable("test")
创建数据框后我做了一些转换和处理 检查点可以帮助我以及我在我的案例中的使用方式
*****************解决方案******************
val spark = SparkSession .builder .appName("test") .config("spark.local", "local[*]") .getOrCreate() spark.sparkContext.setCheckpointDir(path_checkpoint) 在我调用数据帧上的检查点函数之后 我指定了一个触发器来执行作业
.writeStream
.format("csv")
.option("codec", "org.apache.hadoop.io.compress.GzipCodec")
.option("checkpointLocation",CheckPoint)
.trigger(Trigger.ProcessingTime("180 seconds"))
.option("Path",Path )
.option("header", true)
.outputMode("Append")
.queryName("test")
.start()