无法使用 Spark Structured Streaming 在 Parquet 文件中写入数据
Not able to write Data in Parquet File using Spark Structured Streaming
我有一个 Spark 结构化流:
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.option("subscribe", "topic")
.load()
我想使用 DataStreamWriter 将数据写入文件系统,
val query = df
.writeStream
.outputMode("append")
.format("parquet")
.start("data")
但是在 data
文件夹中创建了零个文件。仅 _spark_metadata
正在创建。
但是,当 format
为 console
时,我可以在控制台上看到数据:
val query = df
.writeStream
.outputMode("append")
.format("console")
.start()
+--------------------+------------------+------------------+
| time| col1| col2|
+--------------------+------------------+------------------+
|49368-05-11 20:42...|0.9166470338147503|0.5576946794171861|
+--------------------+------------------+------------------+
我不明白背后的原因。
Spark - 2.1.0
我解决了这个问题。实际上,当我尝试 运行 spark-shell
上的结构化流时,它给出了一个错误,即 endingOffsets
在流查询中无效,即:
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.option("subscribe", "topic")
.load()
java.lang.IllegalArgumentException: ending offset not valid in streaming queries
at org.apache.spark.sql.kafka010.KafkaSourceProvider$$anonfun$validateStreamOptions.apply(KafkaSourceProvider.scala:374)
at org.apache.spark.sql.kafka010.KafkaSourceProvider$$anonfun$validateStreamOptions.apply(KafkaSourceProvider.scala:373)
at scala.Option.map(Option.scala:146)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.validateStreamOptions(KafkaSourceProvider.scala:373)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:60)
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:199)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:87)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:87)
at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:124)
... 48 elided
因此,我从流式查询中删除了 endingOffsets
。
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("startingOffsets", "earliest")
.option("subscribe", "topic")
.load()
然后我尝试将流式查询的结果保存在 Parquet 文件中,在此期间我开始知道 - 必须指定检查点位置,即:
val query = df
.writeStream
.outputMode("append")
.format("parquet")
.start("data")
org.apache.spark.sql.AnalysisException: checkpointLocation must be specified either through option("checkpointLocation", ...) or SparkSession.conf.set("spark.sql.streaming.checkpointLocation", ...);
at org.apache.spark.sql.streaming.StreamingQueryManager$$anonfun.apply(StreamingQueryManager.scala:207)
at org.apache.spark.sql.streaming.StreamingQueryManager$$anonfun.apply(StreamingQueryManager.scala:204)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:203)
at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:206)
... 48 elided
所以,我添加了 checkPointLocation
:
val query = df
.writeStream
.outputMode("append")
.format("parquet")
.option("checkpointLocation", "checkpoint")
.start("data")
进行这些修改后,我能够将流式查询的结果保存在 Parquet 文件中。
但是,当我通过 sbt
应用程序 运行 相同的代码时,它没有抛出任何错误,但是当我 运行 通过 spark-shell
抛出这些错误的相同代码。我认为 Apache Spark 也应该通过 sbt
/maven
应用程序在 运行 时抛出这些错误。这对我来说似乎是一个错误!
我遇到了类似的问题,但出于不同的原因,在这里发帖以防有人遇到同样的问题。当以带水印的追加模式将输出流写入文件时,结构化流式传输有一个有趣的行为,即它不会实际写入任何数据,直到时间桶早于水印时间。如果您正在测试结构化流式传输并且有一个小时长的水印,您将至少在一个小时内看不到任何输出。
我有一个 Spark 结构化流:
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.option("subscribe", "topic")
.load()
我想使用 DataStreamWriter 将数据写入文件系统,
val query = df
.writeStream
.outputMode("append")
.format("parquet")
.start("data")
但是在 data
文件夹中创建了零个文件。仅 _spark_metadata
正在创建。
但是,当 format
为 console
时,我可以在控制台上看到数据:
val query = df
.writeStream
.outputMode("append")
.format("console")
.start()
+--------------------+------------------+------------------+
| time| col1| col2|
+--------------------+------------------+------------------+
|49368-05-11 20:42...|0.9166470338147503|0.5576946794171861|
+--------------------+------------------+------------------+
我不明白背后的原因。
Spark - 2.1.0
我解决了这个问题。实际上,当我尝试 运行 spark-shell
上的结构化流时,它给出了一个错误,即 endingOffsets
在流查询中无效,即:
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.option("subscribe", "topic")
.load()
java.lang.IllegalArgumentException: ending offset not valid in streaming queries
at org.apache.spark.sql.kafka010.KafkaSourceProvider$$anonfun$validateStreamOptions.apply(KafkaSourceProvider.scala:374)
at org.apache.spark.sql.kafka010.KafkaSourceProvider$$anonfun$validateStreamOptions.apply(KafkaSourceProvider.scala:373)
at scala.Option.map(Option.scala:146)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.validateStreamOptions(KafkaSourceProvider.scala:373)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:60)
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:199)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:87)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:87)
at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:124)
... 48 elided
因此,我从流式查询中删除了 endingOffsets
。
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("startingOffsets", "earliest")
.option("subscribe", "topic")
.load()
然后我尝试将流式查询的结果保存在 Parquet 文件中,在此期间我开始知道 - 必须指定检查点位置,即:
val query = df
.writeStream
.outputMode("append")
.format("parquet")
.start("data")
org.apache.spark.sql.AnalysisException: checkpointLocation must be specified either through option("checkpointLocation", ...) or SparkSession.conf.set("spark.sql.streaming.checkpointLocation", ...);
at org.apache.spark.sql.streaming.StreamingQueryManager$$anonfun.apply(StreamingQueryManager.scala:207)
at org.apache.spark.sql.streaming.StreamingQueryManager$$anonfun.apply(StreamingQueryManager.scala:204)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:203)
at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:206)
... 48 elided
所以,我添加了 checkPointLocation
:
val query = df
.writeStream
.outputMode("append")
.format("parquet")
.option("checkpointLocation", "checkpoint")
.start("data")
进行这些修改后,我能够将流式查询的结果保存在 Parquet 文件中。
但是,当我通过 sbt
应用程序 运行 相同的代码时,它没有抛出任何错误,但是当我 运行 通过 spark-shell
抛出这些错误的相同代码。我认为 Apache Spark 也应该通过 sbt
/maven
应用程序在 运行 时抛出这些错误。这对我来说似乎是一个错误!
我遇到了类似的问题,但出于不同的原因,在这里发帖以防有人遇到同样的问题。当以带水印的追加模式将输出流写入文件时,结构化流式传输有一个有趣的行为,即它不会实际写入任何数据,直到时间桶早于水印时间。如果您正在测试结构化流式传输并且有一个小时长的水印,您将至少在一个小时内看不到任何输出。