"ValueArray is not declared" 在包含哈希映射的数据集上使用火花映射函数时出现异常

"ValueArray is not declared" exception when using spark map functions on dataset containing hash maps

我有一个class


@Getter
@Setter
@NoArgsConstructor
public class TestInput implements Serializable {
    private Map<String, String> key1;
}

和json文件

[{"key1": {"key2": "key3"}}]

我尝试使用 spark map 函数读取和操作数据集

Dataset<TestInput> input = sparkSession.read().json(inputPath).as(Encoders.bean(TestInput.class));

Dataset<TestInput> output = input.map((MapFunction<TestInput, TestInput>) x -> x, Encoders.bean(TestInput.class));

输入被正确读取,因为我能够查看 input.show() 的响应,但响应不包括实际地图

input.show()

+------+
|key1  |
+------+
|[key3]|
+------+

但是 map 函数失败(也将其转换为 Java POJO)并出现错误

A method named "valueArray" is not declared in any enclosing class nor any supertype, nor through a static import

我做错了什么?

此外,如果我将输入写回文件,我会取回原始文件 json,因此它可能理解映射,但无法转换为所需的 POJO

如果您查看 input 的架构,您会发现 key1 不是映射而是字符串数组。这就是您看到错误的原因。将您的模式设置为地图,您应该可以开始了。

Scala 代码

val expectedSchema = StructType(Seq(StructField("key1", MapType(StringType, StringType))))

val input = spark.read
  .schema(expectedSchema)
  .json(path)
  .as(Encoders.bean(classOf[TestInput]))

// key1 should be of map type
input.printSchema()

val out = input.map(
  (x: TestInput) => {
    x.setKey1((Map(("newkey", "newval")) ++ x.getKey1.asScala).asJava)
    x
  },
  Encoders.bean(classOf[TestInput]),
)

input.show(truncate = false)
out.show(truncate = false)