为什么在写入镶木地板时流式查询失败并显示 "InvalidSchemaException: A group type can not be empty (...) Empty group: spark_schema"?
Why does streaming query fail with "InvalidSchemaException: A group type can not be empty (...) Empty group: spark_schema" while writing to parquet?
我使用 Spark 2.2.1 和 Parquet 1.8.1。
我想从 Kafka 读取 JSON 数据并进行一些转换,然后将数据写入 parquet 文件,然后可以由 Apache Hive 加载这些文件。但是我在writeStream to parquet的时候遇到了下面的错误。
Caused by: org.apache.parquet.schema.InvalidSchemaException: A group type can not be empty. Parquet does not support empty group without leaves. Empty group: spark_schema
at org.apache.parquet.schema.GroupType.<init>(GroupType.java:92)
at org.apache.parquet.schema.GroupType.<init>(GroupType.java:48)
at org.apache.parquet.schema.MessageType.<init>(MessageType.java:50)
at org.apache.parquet.schema.Types$MessageTypeBuilder.named(Types.java:1256)
at org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$.<init>(ParquetSchemaConverter.scala:563)
at org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$.<clinit>(ParquetSchemaConverter.scala)
... 22 more
我用谷歌搜索并发现其他人遇到了相同的 ,其根本原因不是所有字段都是 parquet 不支持的叶字段,但在我的数据框中有所有叶字段。为什么?提前致谢!
这是我的代码:
val nestTimestampFormat = "yyyy-MM-dd'T'HH:mm:ss.sss'Z'"
val jsonOptions: Map[String, String] = Map{ "timestampFormat" -> nestTimestampFormat }
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "wikipedia-edits")
.option("startingOffsets", "earliest")
.option("group.id", "SparkProcessor")
.load()
.select(from_json(col("value").cast("string"), schema, jsonOptions) as "wikiEdit")
val parsed = df.select("wikiEdit.bot", "wikiEdit.title", "wikiEdit.user", "wikiEdit.wiki")
parsed.printSchema()
//parsed.writeStream.format("console").option("truncate", false).start().awaitTermination(30000)
parsed.writeStream.format("parquet")
.option("path","hdfs://localhost:9000/wiki-parquet-spark")
.option("checkpointLocation", "hdfs://localhost:9000/checkpoint")
.trigger(Trigger.ProcessingTime(10*1000))
.start.awaitTermination()
该程序可以打印架构并在数据框中显示一些数据。
root
|-- bot: boolean (nullable = true)
|-- title: string (nullable = true)
|-- user: string (nullable = true)
|-- wiki: string (nullable = true)
-------------------------------------------
Batch: 0
-------------------------------------------
+-----+-----------------------------+----------+-----------+
|bot |title |user |wiki |
+-----+-----------------------------+----------+-----------+
|false|Jak Roberto |WikiPedant|enwiki |
|false|File:Oostkamp01.jpg |Herpoel |commonswiki|
|false|Category:CC-BY-SA-4.0 |Herpoel |commonswiki|
|false|Category:Self-published work |Herpoel |commonswiki|
|false|Category:Geography of Belgium|Herpoel |commonswiki|
|false|Category:CC-BY-SA-4.0 |Herpoel |commonswiki|
|false|Category:Self-published work |Herpoel |commonswiki|
|false|Category:Geography of Belgium|Herpoel |commonswiki|
|false|Category:CC-BY-SA-4.0 |Herpoel |commonswiki|
|false|Category:Self-published work |Herpoel |commonswiki|
|false|Category:Geography of Belgium|Herpoel |commonswiki|
|false|Category:CC-BY-SA-4.0 |Herpoel |commonswiki|
|false|Category:Self-published work |Herpoel |commonswiki|
|false|Category:Geography of Belgium|Herpoel |commonswiki|
|false|Category:CC-BY-SA-4.0 |Herpoel |commonswiki|
|false|Category:Self-published work |Herpoel |commonswiki|
|false|Category:Geography of Belgium|Herpoel |commonswiki|
|true |Category:CC-BY-SA-4.0 |Herpoel |commonswiki|
|true |Category:Self-published work |Herpoel |commonswiki|
|true |Category:Geography of Belgium|Herpoel |commonswiki|
+-----+-----------------------------+----------+-----------+
only showing top 20 rows
TL;DR 升级到 Spark 2.2.0(或更好到 2.2.1)。
我认为它与PARQUET-363有关
无法为确实提到错误消息和 Spark 的 ReadContext.requestedSchema 构造空 MessageType。
In parquet-mr 1.8.1, constructing empty GroupType (and thus MessageType) is not allowed anymore (see PARQUET-278). This change makes sense in most cases since Parquet doesn't support empty groups. However, there is one use case where an empty MessageType is valid, namely passing an empty MessageType as the requestedSchema constructor argument of ReadContext when counting rows in a Parquet file. The reason why it works is that, Parquet can retrieve row count from block metadata without materializing any columns.
稍后在问题报告中:
We can see that Spark SQL passes no requested columns to the underlying Parquet reader.
该问题报告链接到 pull request #263,上面写着:
This removes the check added in PARQUET-278 that rejects schema groups
that have no fields. Selecting 0 columns from a file is allowed and used
by Hive and SparkSQL to implement queries like select count(1) ...
并且拉取请求确实删除了检查(您在流式数据集/Spark 结构化流中一直面临的)。
throw new InvalidSchemaException("A group type can not be empty. Parquet does not support empty group without leaves. Empty group: " + name);
据此我们了解到 Spark 中的 Parquet 版本可能与 1.8 发布分支不同。
这导致了 pull request that was eventually closed for the sake of another pull request 中的讨论,该讨论被接受为将 parquet 版本升级到 1.8.2 的一部分。这就是我们想要摆脱错误消息的 Parquet 版本。
由于 Spark 从 Spark 2.2.0 开始使用 Parquet 1.8.2,我的建议是升级到 Spark 2.2.0(甚至更好到 2.2.1)。
我使用 Spark 2.2.1 和 Parquet 1.8.1。
我想从 Kafka 读取 JSON 数据并进行一些转换,然后将数据写入 parquet 文件,然后可以由 Apache Hive 加载这些文件。但是我在writeStream to parquet的时候遇到了下面的错误。
Caused by: org.apache.parquet.schema.InvalidSchemaException: A group type can not be empty. Parquet does not support empty group without leaves. Empty group: spark_schema
at org.apache.parquet.schema.GroupType.<init>(GroupType.java:92)
at org.apache.parquet.schema.GroupType.<init>(GroupType.java:48)
at org.apache.parquet.schema.MessageType.<init>(MessageType.java:50)
at org.apache.parquet.schema.Types$MessageTypeBuilder.named(Types.java:1256)
at org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$.<init>(ParquetSchemaConverter.scala:563)
at org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$.<clinit>(ParquetSchemaConverter.scala)
... 22 more
我用谷歌搜索并发现其他人遇到了相同的
这是我的代码:
val nestTimestampFormat = "yyyy-MM-dd'T'HH:mm:ss.sss'Z'"
val jsonOptions: Map[String, String] = Map{ "timestampFormat" -> nestTimestampFormat }
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "wikipedia-edits")
.option("startingOffsets", "earliest")
.option("group.id", "SparkProcessor")
.load()
.select(from_json(col("value").cast("string"), schema, jsonOptions) as "wikiEdit")
val parsed = df.select("wikiEdit.bot", "wikiEdit.title", "wikiEdit.user", "wikiEdit.wiki")
parsed.printSchema()
//parsed.writeStream.format("console").option("truncate", false).start().awaitTermination(30000)
parsed.writeStream.format("parquet")
.option("path","hdfs://localhost:9000/wiki-parquet-spark")
.option("checkpointLocation", "hdfs://localhost:9000/checkpoint")
.trigger(Trigger.ProcessingTime(10*1000))
.start.awaitTermination()
该程序可以打印架构并在数据框中显示一些数据。
root
|-- bot: boolean (nullable = true)
|-- title: string (nullable = true)
|-- user: string (nullable = true)
|-- wiki: string (nullable = true)
-------------------------------------------
Batch: 0
-------------------------------------------
+-----+-----------------------------+----------+-----------+
|bot |title |user |wiki |
+-----+-----------------------------+----------+-----------+
|false|Jak Roberto |WikiPedant|enwiki |
|false|File:Oostkamp01.jpg |Herpoel |commonswiki|
|false|Category:CC-BY-SA-4.0 |Herpoel |commonswiki|
|false|Category:Self-published work |Herpoel |commonswiki|
|false|Category:Geography of Belgium|Herpoel |commonswiki|
|false|Category:CC-BY-SA-4.0 |Herpoel |commonswiki|
|false|Category:Self-published work |Herpoel |commonswiki|
|false|Category:Geography of Belgium|Herpoel |commonswiki|
|false|Category:CC-BY-SA-4.0 |Herpoel |commonswiki|
|false|Category:Self-published work |Herpoel |commonswiki|
|false|Category:Geography of Belgium|Herpoel |commonswiki|
|false|Category:CC-BY-SA-4.0 |Herpoel |commonswiki|
|false|Category:Self-published work |Herpoel |commonswiki|
|false|Category:Geography of Belgium|Herpoel |commonswiki|
|false|Category:CC-BY-SA-4.0 |Herpoel |commonswiki|
|false|Category:Self-published work |Herpoel |commonswiki|
|false|Category:Geography of Belgium|Herpoel |commonswiki|
|true |Category:CC-BY-SA-4.0 |Herpoel |commonswiki|
|true |Category:Self-published work |Herpoel |commonswiki|
|true |Category:Geography of Belgium|Herpoel |commonswiki|
+-----+-----------------------------+----------+-----------+
only showing top 20 rows
TL;DR 升级到 Spark 2.2.0(或更好到 2.2.1)。
我认为它与PARQUET-363有关 无法为确实提到错误消息和 Spark 的 ReadContext.requestedSchema 构造空 MessageType。
In parquet-mr 1.8.1, constructing empty GroupType (and thus MessageType) is not allowed anymore (see PARQUET-278). This change makes sense in most cases since Parquet doesn't support empty groups. However, there is one use case where an empty MessageType is valid, namely passing an empty MessageType as the requestedSchema constructor argument of ReadContext when counting rows in a Parquet file. The reason why it works is that, Parquet can retrieve row count from block metadata without materializing any columns.
稍后在问题报告中:
We can see that Spark SQL passes no requested columns to the underlying Parquet reader.
该问题报告链接到 pull request #263,上面写着:
This removes the check added in PARQUET-278 that rejects schema groups that have no fields. Selecting 0 columns from a file is allowed and used by Hive and SparkSQL to implement queries like select count(1) ...
并且拉取请求确实删除了检查(您在流式数据集/Spark 结构化流中一直面临的)。
throw new InvalidSchemaException("A group type can not be empty. Parquet does not support empty group without leaves. Empty group: " + name);
据此我们了解到 Spark 中的 Parquet 版本可能与 1.8 发布分支不同。
这导致了 pull request that was eventually closed for the sake of another pull request 中的讨论,该讨论被接受为将 parquet 版本升级到 1.8.2 的一部分。这就是我们想要摆脱错误消息的 Parquet 版本。
由于 Spark 从 Spark 2.2.0 开始使用 Parquet 1.8.2,我的建议是升级到 Spark 2.2.0(甚至更好到 2.2.1)。