如何使用 java 对 spark 数据集中的可选字段进行编码?
How to encode optional fields in spark dataset with java?
我不想对数据集中使用的 class 字段使用空值。我尝试使用 scala Option
和 java Optional
但它失败了:
@AllArgsConstructor // lombok
@NoArgsConstructor // mutable type is required in java :(
@Data // see
public static class TestClass {
String id;
Option<Integer> optionalInt;
}
@Test
public void testDatasetWithOptionField(){
Dataset<TestClass> ds = spark.createDataset(Arrays.asList(
new TestClass("item 1", Option.apply(1)),
new TestClass("item .", Option.empty())
), Encoders.bean(TestClass.class));
ds.collectAsList().forEach(x -> System.out.println("Found " + x));
}
运行时失败,消息为 File 'generated.java', Line 77, Column 47: Cannot instantiate abstract "scala.Option"
问题:有没有一种方法可以使用 java 对数据集中不带 null 的可选字段进行编码?
附属问题:顺便说一句,我也没有在 scala 中使用太多数据集,你能验证一下在 scala 中是否真的可以对包含选项字段的案例 class 进行编码?
注意:这用于中间数据集,即既不能读也不能写的东西(但用于 spark 内部序列化)
这在 Scala 中相当简单。
Scala 实现
import org.apache.spark.sql.{Encoders, SparkSession}
object Test {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder
.appName("Stack-scala")
.master("local[2]")
.getOrCreate()
val ds = spark.createDataset(Seq(
TestClass("Item 1", Some(1)),
TestClass("Item 2", None)
))( Encoders.product[TestClass])
ds.collectAsList().forEach(println)
spark.stop()
}
case class TestClass(
id: String,
optionalInt: Option[Int] )
}
Java
Java 中有各种选项 class。然而,none 他们工作 out-of-the-box。
java.util.Optional
: 不可序列化
scala.Option
-> Serializable but abstract,所以当CodeGenerator
生成如下代码时,失败了!
/* 081 */ // initializejavabean(newInstance(class scala.Option))
/* 082 */ final scala.Option value_9 = false ?
/* 083 */ null : new scala.Option(); // ---> Such initialization is not possible for abstract classes
/* 084 */ scala.Option javaBean_1 = value_9;
org.apache.spark.api.java.Optional
-> Spark 的 Optional 实现是可序列化的,但具有私有构造函数。因此,它失败并出现错误:没有找到适用的 constructor/method 零实际参数 。由于这是 final
class,因此无法扩展它。
/* 081 */ // initializejavabean(newInstance(class org.apache.spark.api.java.Optional))
/* 082 */ final org.apache.spark.api.java.Optional value_9 = false ?
/* 083 */ null : new org.apache.spark.api.java.Optional();
/* 084 */ org.apache.spark.api.java.Optional javaBean_1 = value_9;
/* 085 */ if (!false) {
一种选择是在数据 class 中使用普通的 Java 可选项,然后使用 Kryo 作为序列化器。
Encoder en = Encoders.kryo(TestClass.class);
Dataset<TestClass> ds = spark.createDataset(Arrays.asList(
new TestClass("item 1", Optional.of(1)),
new TestClass("item .", Optional.empty())
), en);
ds.collectAsList().forEach(x -> System.out.println("Found " + x));
输出:
Found TestClass(id=item 1, optionalInt=Optional[1])
Found TestClass(id=item ., optionalInt=Optional.empty)
使用 Kryo 有一个缺点:此编码器以二进制格式编码:
ds.printSchema();
ds.show(false);
打印
root
|-- value: binary (nullable = true)
+-------------------------------------------------------------------------------------------------------+
|value |
+-------------------------------------------------------------------------------------------------------+
|[01 00 4A 61 76 61 53 74 61 72 74 65 72 24 54 65 73 74 43 6C 61 73 F3 01 01 69 74 65 6D 20 B1 01 02 02]|
|[01 00 4A 61 76 61 53 74 61 72 74 65 72 24 54 65 73 74 43 6C 61 73 F3 01 01 69 74 65 6D 20 AE 01 00] |
+-------------------------------------------------------------------------------------------------------+
获取使用 Kryo 编码的数据集的正常输出列的 udf-based 解决方案描述了 this answer。
可能有点 off-topic 但可能开始寻找 long-term 解决方案是查看 JavaTypeInference. The methods serializerFor and deserializerFor are used by ExpressionEncoder.javaBean 的代码以创建编码器的序列化器和反序列化器部分 Java豆子.
typeToken.getRawType match {
case c if c == classOf[String] => createSerializerForString(inputObject)
case c if c == classOf[java.time.Instant] => createSerializerForJavaInstant(inputObject)
case c if c == classOf[java.sql.Timestamp] => createSerializerForSqlTimestamp(inputObject)
case c if c == classOf[java.time.LocalDate] => createSerializerForJavaLocalDate(inputObject)
case c if c == classOf[java.sql.Date] => createSerializerForSqlDate(inputObject)
[...]
缺少 java.util.Optional
的处理。它可能会被添加到这里以及相应的 deserialize method 中。这将允许 Java 个 bean 具有 Optional
.
类型的属性
我不想对数据集中使用的 class 字段使用空值。我尝试使用 scala Option
和 java Optional
但它失败了:
@AllArgsConstructor // lombok
@NoArgsConstructor // mutable type is required in java :(
@Data // see
public static class TestClass {
String id;
Option<Integer> optionalInt;
}
@Test
public void testDatasetWithOptionField(){
Dataset<TestClass> ds = spark.createDataset(Arrays.asList(
new TestClass("item 1", Option.apply(1)),
new TestClass("item .", Option.empty())
), Encoders.bean(TestClass.class));
ds.collectAsList().forEach(x -> System.out.println("Found " + x));
}
运行时失败,消息为 File 'generated.java', Line 77, Column 47: Cannot instantiate abstract "scala.Option"
问题:有没有一种方法可以使用 java 对数据集中不带 null 的可选字段进行编码?
附属问题:顺便说一句,我也没有在 scala 中使用太多数据集,你能验证一下在 scala 中是否真的可以对包含选项字段的案例 class 进行编码?
注意:这用于中间数据集,即既不能读也不能写的东西(但用于 spark 内部序列化)
这在 Scala 中相当简单。
Scala 实现
import org.apache.spark.sql.{Encoders, SparkSession}
object Test {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder
.appName("Stack-scala")
.master("local[2]")
.getOrCreate()
val ds = spark.createDataset(Seq(
TestClass("Item 1", Some(1)),
TestClass("Item 2", None)
))( Encoders.product[TestClass])
ds.collectAsList().forEach(println)
spark.stop()
}
case class TestClass(
id: String,
optionalInt: Option[Int] )
}
Java
Java 中有各种选项 class。然而,none 他们工作 out-of-the-box。
java.util.Optional
: 不可序列化scala.Option
-> Serializable but abstract,所以当CodeGenerator
生成如下代码时,失败了!
/* 081 */ // initializejavabean(newInstance(class scala.Option))
/* 082 */ final scala.Option value_9 = false ?
/* 083 */ null : new scala.Option(); // ---> Such initialization is not possible for abstract classes
/* 084 */ scala.Option javaBean_1 = value_9;
org.apache.spark.api.java.Optional
-> Spark 的 Optional 实现是可序列化的,但具有私有构造函数。因此,它失败并出现错误:没有找到适用的 constructor/method 零实际参数 。由于这是final
class,因此无法扩展它。
/* 081 */ // initializejavabean(newInstance(class org.apache.spark.api.java.Optional))
/* 082 */ final org.apache.spark.api.java.Optional value_9 = false ?
/* 083 */ null : new org.apache.spark.api.java.Optional();
/* 084 */ org.apache.spark.api.java.Optional javaBean_1 = value_9;
/* 085 */ if (!false) {
一种选择是在数据 class 中使用普通的 Java 可选项,然后使用 Kryo 作为序列化器。
Encoder en = Encoders.kryo(TestClass.class);
Dataset<TestClass> ds = spark.createDataset(Arrays.asList(
new TestClass("item 1", Optional.of(1)),
new TestClass("item .", Optional.empty())
), en);
ds.collectAsList().forEach(x -> System.out.println("Found " + x));
输出:
Found TestClass(id=item 1, optionalInt=Optional[1])
Found TestClass(id=item ., optionalInt=Optional.empty)
使用 Kryo 有一个缺点:此编码器以二进制格式编码:
ds.printSchema();
ds.show(false);
打印
root
|-- value: binary (nullable = true)
+-------------------------------------------------------------------------------------------------------+
|value |
+-------------------------------------------------------------------------------------------------------+
|[01 00 4A 61 76 61 53 74 61 72 74 65 72 24 54 65 73 74 43 6C 61 73 F3 01 01 69 74 65 6D 20 B1 01 02 02]|
|[01 00 4A 61 76 61 53 74 61 72 74 65 72 24 54 65 73 74 43 6C 61 73 F3 01 01 69 74 65 6D 20 AE 01 00] |
+-------------------------------------------------------------------------------------------------------+
获取使用 Kryo 编码的数据集的正常输出列的 udf-based 解决方案描述了 this answer。
可能有点 off-topic 但可能开始寻找 long-term 解决方案是查看 JavaTypeInference. The methods serializerFor and deserializerFor are used by ExpressionEncoder.javaBean 的代码以创建编码器的序列化器和反序列化器部分 Java豆子.
typeToken.getRawType match {
case c if c == classOf[String] => createSerializerForString(inputObject)
case c if c == classOf[java.time.Instant] => createSerializerForJavaInstant(inputObject)
case c if c == classOf[java.sql.Timestamp] => createSerializerForSqlTimestamp(inputObject)
case c if c == classOf[java.time.LocalDate] => createSerializerForJavaLocalDate(inputObject)
case c if c == classOf[java.sql.Date] => createSerializerForSqlDate(inputObject)
[...]
缺少 java.util.Optional
的处理。它可能会被添加到这里以及相应的 deserialize method 中。这将允许 Java 个 bean 具有 Optional
.