如何使用 java 对 spark 数据集中的可选字段进行编码?

How to encode optional fields in spark dataset with java?

我不想对数据集中使用的 class 字段使用空值。我尝试使用 scala Option 和 java Optional 但它失败了:

    @AllArgsConstructor // lombok
    @NoArgsConstructor  // mutable type is required in java :(
    @Data               // see 
    public static class TestClass {
        String id;
        Option<Integer> optionalInt;
    }

    @Test
    public void testDatasetWithOptionField(){
        Dataset<TestClass> ds = spark.createDataset(Arrays.asList(
                new TestClass("item 1", Option.apply(1)),
                new TestClass("item .", Option.empty())
        ), Encoders.bean(TestClass.class));

        ds.collectAsList().forEach(x -> System.out.println("Found " + x));
    }

运行时失败,消息为 File 'generated.java', Line 77, Column 47: Cannot instantiate abstract "scala.Option"


问题:有没有一种方法可以使用 java 对数据集中不带 null 的可选字段进行编码?

附属问题:顺便说一句,我也没有在 scala 中使用太多数据集,你能验证一下在 scala 中是否真的可以对包含选项字段的案例 class 进行编码?


注意:这用于中间数据集,即既不能读也不能写的东西(但用于 spark 内部序列化)

这在 Scala 中相当简单。

Scala 实现

import org.apache.spark.sql.{Encoders, SparkSession}

object Test {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder
      .appName("Stack-scala")
      .master("local[2]")
      .getOrCreate()

    val ds = spark.createDataset(Seq(
      TestClass("Item 1", Some(1)),
      TestClass("Item 2", None)
    ))( Encoders.product[TestClass])

    ds.collectAsList().forEach(println)

    spark.stop()
  }

  case class TestClass(
    id: String,
    optionalInt: Option[Int] )
}

Java

Java 中有各种选项 class。然而,none 他们工作 out-of-the-box。

  1. java.util.Optional : 不可序列化
  2. scala.Option -> Serializable but abstract,所以当CodeGenerator生成如下代码时,失败了!
/* 081 */         // initializejavabean(newInstance(class scala.Option))
/* 082 */         final scala.Option value_9 = false ?
/* 083 */         null : new scala.Option();  // ---> Such initialization is not possible for abstract classes
/* 084 */         scala.Option javaBean_1 = value_9;
  1. org.apache.spark.api.java.Optional -> Spark 的 Optional 实现是可序列化的,但具有私有构造函数。因此,它失败并出现错误:没有找到适用的 constructor/method 零实际参数 。由于这是 final class,因此无法扩展它。
/* 081 */         // initializejavabean(newInstance(class org.apache.spark.api.java.Optional))
/* 082 */         final org.apache.spark.api.java.Optional value_9 = false ?
/* 083 */         null : new org.apache.spark.api.java.Optional();
/* 084 */         org.apache.spark.api.java.Optional javaBean_1 = value_9;
/* 085 */         if (!false) {

一种选择是在数据 class 中使用普通的 Java 可选项,然后使用 Kryo 作为序列化器。

Encoder en = Encoders.kryo(TestClass.class);

Dataset<TestClass> ds = spark.createDataset(Arrays.asList(
        new TestClass("item 1", Optional.of(1)),
        new TestClass("item .", Optional.empty())
), en);

ds.collectAsList().forEach(x -> System.out.println("Found " + x));

输出:

Found TestClass(id=item 1, optionalInt=Optional[1])
Found TestClass(id=item ., optionalInt=Optional.empty)

使用 Kryo 有一个缺点:此编码器以二进制格式编码:

ds.printSchema();
ds.show(false);

打印

root
 |-- value: binary (nullable = true)

+-------------------------------------------------------------------------------------------------------+
|value                                                                                                  |
+-------------------------------------------------------------------------------------------------------+
|[01 00 4A 61 76 61 53 74 61 72 74 65 72 24 54 65 73 74 43 6C 61 73 F3 01 01 69 74 65 6D 20 B1 01 02 02]|
|[01 00 4A 61 76 61 53 74 61 72 74 65 72 24 54 65 73 74 43 6C 61 73 F3 01 01 69 74 65 6D 20 AE 01 00]   |
+-------------------------------------------------------------------------------------------------------+

获取使用 Kryo 编码的数据集的正常输出列的 udf-based 解决方案描述了 this answer


可能有点 off-topic 但可能开始寻找 long-term 解决方案是查看 JavaTypeInference. The methods serializerFor and deserializerFor are used by ExpressionEncoder.javaBean 的代码以创建编码器的序列化器和反序列化器部分 Java豆子.

在这个pattern matching block

typeToken.getRawType match {
   case c if c == classOf[String] => createSerializerForString(inputObject)
   case c if c == classOf[java.time.Instant] => createSerializerForJavaInstant(inputObject)
   case c if c == classOf[java.sql.Timestamp] => createSerializerForSqlTimestamp(inputObject)
   case c if c == classOf[java.time.LocalDate] => createSerializerForJavaLocalDate(inputObject)
   case c if c == classOf[java.sql.Date] => createSerializerForSqlDate(inputObject)
   [...]

缺少 java.util.Optional 的处理。它可能会被添加到这里以及相应的 deserialize method 中。这将允许 Java 个 bean 具有 Optional.

类型的属性