在我停止作业之前,Spark Structured Streaming writestream 不会写入文件
Spark Structured Streaming writestream doesn't write file until I stop the job
我在一个经典用例中使用 Spark Structured Streaming:我想读取一个 kafka 主题并将流以 parquet 格式写入 HDFS。
这是我的代码:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.{ArrayType, DataTypes, StructType}
object TestKafkaReader extends App{
val spark = SparkSession
.builder
.appName("Spark-Kafka-Integration")
.master("local")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
val kafkaDf = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","KAFKA_BROKER_IP:PORT")
//.option("subscribe", "test")
.option("subscribe", "test")
.option("startingOffsets", "earliest")
.load()
val moviesJsonDf = kafkaDf.selectExpr("CAST(value AS STRING)")
// movie struct
val struct = new StructType()
.add("title", DataTypes.StringType)
.add("year", DataTypes.IntegerType)
.add("cast", ArrayType(DataTypes.StringType))
.add("genres", ArrayType(DataTypes.StringType))
val moviesNestedDf = moviesJsonDf.select(from_json($"value", struct).as("movie"))
// json flatten
val movieFlattenedDf = moviesNestedDf.selectExpr("movie.title", "movie.year", "movie.cast","movie.genres")
// convert to parquet and save to hdfs
val query = movieFlattenedDf
.writeStream
.outputMode("append")
.format("parquet")
.queryName("movies")
.option("checkpointLocation", "src/main/resources/chkpoint_dir")
.start("src/main/resources/output")
.awaitTermination()
}
上下文:
- 我 运行 直接从 intellij(使用本地 spark
安装)
- 我成功地从 kafka 读取并写入
控制台(使用控制台模式)
- 目前我想写文件
在本地机器上(但我确实在 HDFS 集群上尝试过,问题是
相同)
我的问题:
在工作期间,它没有在文件夹中写入任何内容,我必须手动停止工作才能最终看到文件。
我认为可能与 .awaitTermination()
有关
有关信息,我尝试删除此选项,但如果没有删除此选项,我会收到错误消息,并且作业根本不会 运行.
也许我没有设置正确的选项,但在多次阅读文档并搜索 Google 后,我没有找到任何东西。
你能帮我解决这个问题吗?
谢谢
编辑:
- 我正在使用 spark 2.4.0
- 我尝试了 64/128mb 格式 => 在我停止作业之前没有任何改变没有文件
是问题解决
我的问题是,我的数据太少,而 spark 正在等待更多数据来写入 parquet 文件。
为了完成这项工作,我使用了@AlexandrosBiratsis 的评论
(更改块大小)
再次归功于@AlexandrosBiratsis
非常感谢
我在一个经典用例中使用 Spark Structured Streaming:我想读取一个 kafka 主题并将流以 parquet 格式写入 HDFS。
这是我的代码:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.{ArrayType, DataTypes, StructType}
object TestKafkaReader extends App{
val spark = SparkSession
.builder
.appName("Spark-Kafka-Integration")
.master("local")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
val kafkaDf = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","KAFKA_BROKER_IP:PORT")
//.option("subscribe", "test")
.option("subscribe", "test")
.option("startingOffsets", "earliest")
.load()
val moviesJsonDf = kafkaDf.selectExpr("CAST(value AS STRING)")
// movie struct
val struct = new StructType()
.add("title", DataTypes.StringType)
.add("year", DataTypes.IntegerType)
.add("cast", ArrayType(DataTypes.StringType))
.add("genres", ArrayType(DataTypes.StringType))
val moviesNestedDf = moviesJsonDf.select(from_json($"value", struct).as("movie"))
// json flatten
val movieFlattenedDf = moviesNestedDf.selectExpr("movie.title", "movie.year", "movie.cast","movie.genres")
// convert to parquet and save to hdfs
val query = movieFlattenedDf
.writeStream
.outputMode("append")
.format("parquet")
.queryName("movies")
.option("checkpointLocation", "src/main/resources/chkpoint_dir")
.start("src/main/resources/output")
.awaitTermination()
}
上下文:
- 我 运行 直接从 intellij(使用本地 spark 安装)
- 我成功地从 kafka 读取并写入 控制台(使用控制台模式)
- 目前我想写文件 在本地机器上(但我确实在 HDFS 集群上尝试过,问题是 相同)
我的问题:
在工作期间,它没有在文件夹中写入任何内容,我必须手动停止工作才能最终看到文件。
我认为可能与 .awaitTermination()
有关
有关信息,我尝试删除此选项,但如果没有删除此选项,我会收到错误消息,并且作业根本不会 运行.
也许我没有设置正确的选项,但在多次阅读文档并搜索 Google 后,我没有找到任何东西。
你能帮我解决这个问题吗?
谢谢
编辑:
- 我正在使用 spark 2.4.0
- 我尝试了 64/128mb 格式 => 在我停止作业之前没有任何改变没有文件
是问题解决
我的问题是,我的数据太少,而 spark 正在等待更多数据来写入 parquet 文件。
为了完成这项工作,我使用了@AlexandrosBiratsis 的评论 (更改块大小)
再次归功于@AlexandrosBiratsis 非常感谢