具有类型化列的 Spark 数据集 select
Spark Dataset select with typedcolumn
查看 spark 数据集上的 select()
函数,有各种生成的函数签名:
(c1: TypedColumn[MyClass, U1],c2: TypedColumn[MyClass, U2] ....)
这似乎暗示我应该能够直接引用 MyClass 的成员并且是类型安全的,但我不确定如何...
ds.select("member")
当然有效.. 似乎 ds.select(_.member)
也可能以某种方式有效?
在 select
的 Scala DSL 中,有很多方法可以识别 Column
:
- 来自符号:
'name
- 来自字符串:
$"name"
或 col(name)
- 来自表达式:
expr("nvl(name, 'unknown') as renamed")
要从 Column
获得 TypedColumn
,您只需使用 myCol.as[T]
。
例如:ds.select(col("name").as[String])
如果你想要 ds.select(_.member)
的等价物,只需使用 map
:
case class MyClass(member: MyMember, foo: A, bar: B)
val ds: DataSet[MyClass] = ???
val members: DataSet[MyMember] = ds.map(_.member)
编辑:不使用map
的说法。
一种更高效的方法是通过投影,而不是使用 map
。您失去了编译时类型检查,但作为交换,Catalyst 查询引擎有机会做一些更优化的事情。正如@Sim 在下面的评论中提到的那样,主要的优化集中在不需要将 MyClass
的全部内容从 Tungsten 内存 space 反序列化到 JVM 堆内存中——只是为了调用访问器——和然后将 _.member
的结果序列化回 Tungsten。
为了举一个更具体的例子,让我们像这样重新定义我们的数据模型:
// Make sure these are not nested classes
// (i.e. in a top level compilation units).
case class MyMember(something: Double)
case class MyClass(member: MyMember, foo: Int, bar: String)
这些需要 case
类 以便 SQLImplicits.newProductEncoder[T <: Product]
可以为我们提供 Dataset[T]
[=58= 所需的隐式 Encoder[MyClass]
].
现在我们可以使上面的例子更加具体:
val ds: Dataset[MyClass] = Seq(MyClass(MyMember(1.0), 2, "three")).toDS()
val membersMapped: Dataset[Double] = ds.map(_.member.something)
要查看幕后发生的事情,我们使用 explain()
方法:
membersMapped.explain()
== Physical Plan ==
*(1) SerializeFromObject [input[0, double, false] AS value#19]
+- *(1) MapElements <function1>, obj#18: double
+- *(1) DeserializeToObject newInstance(class MyClass), obj#17: MyClass
+- LocalTableScan [member#12, foo#13, bar#14]
这使得序列化 to/from 钨非常明显。
让我们使用投影得到相同的值[^1]:
val ds2: Dataset[Double] = ds.select($"member.something".as[Double])
ds2.explain()
== Physical Plan ==
LocalTableScan [something#25]
就是这样!一步 [^2]。除了将 MyClass
编码到原始数据集中之外,没有序列化。
[^1]:投影被定义为 $"member.something"
而不是 $"value.member.something"
的原因与 Catalyst 自动投影单列 DataFrame 的成员有关。
[^2]:公平地说,第一个物理计划中步骤旁边的 *
表明它们将由 WholeStageCodegenExec
实施,从而使这些步骤成为一个单一的,on-即时编译的 JVM 函数,它应用了自己的一组运行时优化。因此在实践中,您必须根据经验测试性能才能真正评估每种方法的好处。
查看 spark 数据集上的 select()
函数,有各种生成的函数签名:
(c1: TypedColumn[MyClass, U1],c2: TypedColumn[MyClass, U2] ....)
这似乎暗示我应该能够直接引用 MyClass 的成员并且是类型安全的,但我不确定如何...
ds.select("member")
当然有效.. 似乎 ds.select(_.member)
也可能以某种方式有效?
在 select
的 Scala DSL 中,有很多方法可以识别 Column
:
- 来自符号:
'name
- 来自字符串:
$"name"
或col(name)
- 来自表达式:
expr("nvl(name, 'unknown') as renamed")
要从 Column
获得 TypedColumn
,您只需使用 myCol.as[T]
。
例如:ds.select(col("name").as[String])
如果你想要 ds.select(_.member)
的等价物,只需使用 map
:
case class MyClass(member: MyMember, foo: A, bar: B)
val ds: DataSet[MyClass] = ???
val members: DataSet[MyMember] = ds.map(_.member)
编辑:不使用map
的说法。
一种更高效的方法是通过投影,而不是使用 map
。您失去了编译时类型检查,但作为交换,Catalyst 查询引擎有机会做一些更优化的事情。正如@Sim 在下面的评论中提到的那样,主要的优化集中在不需要将 MyClass
的全部内容从 Tungsten 内存 space 反序列化到 JVM 堆内存中——只是为了调用访问器——和然后将 _.member
的结果序列化回 Tungsten。
为了举一个更具体的例子,让我们像这样重新定义我们的数据模型:
// Make sure these are not nested classes
// (i.e. in a top level compilation units).
case class MyMember(something: Double)
case class MyClass(member: MyMember, foo: Int, bar: String)
这些需要 case
类 以便 SQLImplicits.newProductEncoder[T <: Product]
可以为我们提供 Dataset[T]
[=58= 所需的隐式 Encoder[MyClass]
].
现在我们可以使上面的例子更加具体:
val ds: Dataset[MyClass] = Seq(MyClass(MyMember(1.0), 2, "three")).toDS()
val membersMapped: Dataset[Double] = ds.map(_.member.something)
要查看幕后发生的事情,我们使用 explain()
方法:
membersMapped.explain()
== Physical Plan ==
*(1) SerializeFromObject [input[0, double, false] AS value#19]
+- *(1) MapElements <function1>, obj#18: double
+- *(1) DeserializeToObject newInstance(class MyClass), obj#17: MyClass
+- LocalTableScan [member#12, foo#13, bar#14]
这使得序列化 to/from 钨非常明显。
让我们使用投影得到相同的值[^1]:
val ds2: Dataset[Double] = ds.select($"member.something".as[Double])
ds2.explain()
== Physical Plan ==
LocalTableScan [something#25]
就是这样!一步 [^2]。除了将 MyClass
编码到原始数据集中之外,没有序列化。
[^1]:投影被定义为 $"member.something"
而不是 $"value.member.something"
的原因与 Catalyst 自动投影单列 DataFrame 的成员有关。
[^2]:公平地说,第一个物理计划中步骤旁边的 *
表明它们将由 WholeStageCodegenExec
实施,从而使这些步骤成为一个单一的,on-即时编译的 JVM 函数,它应用了自己的一组运行时优化。因此在实践中,您必须根据经验测试性能才能真正评估每种方法的好处。