"ValueArray is not declared" 在包含哈希映射的数据集上使用火花映射函数时出现异常
"ValueArray is not declared" exception when using spark map functions on dataset containing hash maps
我有一个class
@Getter
@Setter
@NoArgsConstructor
public class TestInput implements Serializable {
private Map<String, String> key1;
}
和json文件
[{"key1": {"key2": "key3"}}]
我尝试使用 spark map 函数读取和操作数据集
Dataset<TestInput> input = sparkSession.read().json(inputPath).as(Encoders.bean(TestInput.class));
Dataset<TestInput> output = input.map((MapFunction<TestInput, TestInput>) x -> x, Encoders.bean(TestInput.class));
输入被正确读取,因为我能够查看 input.show() 的响应,但响应不包括实际地图
input.show()
+------+
|key1 |
+------+
|[key3]|
+------+
但是 map 函数失败(也将其转换为 Java POJO)并出现错误
A method named "valueArray" is not declared in any enclosing class nor any supertype, nor through a static import
我做错了什么?
此外,如果我将输入写回文件,我会取回原始文件 json,因此它可能理解映射,但无法转换为所需的 POJO
如果您查看 input
的架构,您会发现 key1
不是映射而是字符串数组。这就是您看到错误的原因。将您的模式设置为地图,您应该可以开始了。
Scala 代码
val expectedSchema = StructType(Seq(StructField("key1", MapType(StringType, StringType))))
val input = spark.read
.schema(expectedSchema)
.json(path)
.as(Encoders.bean(classOf[TestInput]))
// key1 should be of map type
input.printSchema()
val out = input.map(
(x: TestInput) => {
x.setKey1((Map(("newkey", "newval")) ++ x.getKey1.asScala).asJava)
x
},
Encoders.bean(classOf[TestInput]),
)
input.show(truncate = false)
out.show(truncate = false)
我有一个class
@Getter
@Setter
@NoArgsConstructor
public class TestInput implements Serializable {
private Map<String, String> key1;
}
和json文件
[{"key1": {"key2": "key3"}}]
我尝试使用 spark map 函数读取和操作数据集
Dataset<TestInput> input = sparkSession.read().json(inputPath).as(Encoders.bean(TestInput.class));
Dataset<TestInput> output = input.map((MapFunction<TestInput, TestInput>) x -> x, Encoders.bean(TestInput.class));
输入被正确读取,因为我能够查看 input.show() 的响应,但响应不包括实际地图
input.show()
+------+
|key1 |
+------+
|[key3]|
+------+
但是 map 函数失败(也将其转换为 Java POJO)并出现错误
A method named "valueArray" is not declared in any enclosing class nor any supertype, nor through a static import
我做错了什么?
此外,如果我将输入写回文件,我会取回原始文件 json,因此它可能理解映射,但无法转换为所需的 POJO
如果您查看 input
的架构,您会发现 key1
不是映射而是字符串数组。这就是您看到错误的原因。将您的模式设置为地图,您应该可以开始了。
Scala 代码
val expectedSchema = StructType(Seq(StructField("key1", MapType(StringType, StringType))))
val input = spark.read
.schema(expectedSchema)
.json(path)
.as(Encoders.bean(classOf[TestInput]))
// key1 should be of map type
input.printSchema()
val out = input.map(
(x: TestInput) => {
x.setKey1((Map(("newkey", "newval")) ++ x.getKey1.asScala).asJava)
x
},
Encoders.bean(classOf[TestInput]),
)
input.show(truncate = false)
out.show(truncate = false)