避免指定模式两次 (Spark/scala)
Avoid specifying schema twice (Spark/scala)
我需要按特定顺序遍历数据框并应用一些复杂的逻辑来计算新列。
此外,我强烈希望以通用方式进行操作,这样我就不必列出一行的所有列并执行 df.as[my_record]
或 case Row(...) =>
,如 所示。相反,我想通过名称访问行列,并将结果列添加到源行。
下面的方法工作得很好,但我想避免指定模式两次:第一次是这样我可以在迭代时按名称访问列,第二次是处理输出。
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
val q = """
select 2 part, 1 id
union all select 2 part, 4 id
union all select 2 part, 3 id
union all select 2 part, 2 id
"""
val df = spark.sql(q)
def f_row(iter: Iterator[Row]) : Iterator[Row] = {
if (iter.hasNext) {
def complex_logic(p: Int): Integer = if (p == 3) null else p * 10;
val head = iter.next
val schema = StructType(head.schema.fields :+ StructField("result", IntegerType))
val r =
new GenericRowWithSchema((head.toSeq :+ complex_logic(head.getAs("id"))).toArray, schema)
iter.scanLeft(r)((r1, r2) =>
new GenericRowWithSchema((r2.toSeq :+ complex_logic(r2.getAs("id"))).toArray, schema)
)
} else iter
}
val schema = StructType(df.schema.fields :+ StructField("result", IntegerType))
val encoder = RowEncoder(schema)
df.repartition($"part").sortWithinPartitions($"id").mapPartitions(f_row)(encoder).show
应用 mapPartitions
后丢失了哪些信息,因此没有显式编码器无法处理输出?如何避免指定它?
What information is lost after applying mapPartitions so output cannot be processed without
信息几乎不会丢失 - 从一开始就不存在 - Row
或 InternalRow
的子类基本上是无类型的可变形状容器,不提供任何有用的类型信息, 可用于派生 Encoder
.
GenericRowWithSchema
中的 schema
无关紧要,因为它根据元数据而不是类型来描述内容。
How to avoid specifying it?
抱歉,你运气不好。如果你想在静态类型的语言中使用动态类型的结构(一包 Any
),你必须付出代价,这里提供了一个 Encoder
.
好的 - 我已经检查了我的一些 spark 代码并且将 .mapPartitions 与数据集一起使用 API 不需要我明确地 build/pass 编码器。
你需要这样的东西:
case class Before(part: Int, id: Int)
case class After(part: Int, id: Int, newCol: String)
import spark.implicits._
// Note column names/types must match case class constructor parameters.
val beforeDS = <however you obtain your input DF>.as[Before]
def f_row(it: Iterator[Before]): Iterator[After] = ???
beforeDS.reparition($"part").sortWithinPartitions($"id").mapPartitions(f_row).show
我觉得下面的解释很充分,也许对其他人有用。
mapPartitions
需要 Encoder
,否则它无法从迭代器或 Row
构造 Dataset
。即使每一行都有一个模式,该模式不能由 Dataset[U]
.
的构造函数派生(使用)
def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U] = {
new Dataset[U](
sparkSession,
MapPartitions[T, U](func, logicalPlan),
implicitly[Encoder[U]])
}
另一方面,不调用 mapPartitions
Spark 可以使用从初始查询派生的模式,因为原始列的结构(元数据)没有改变。
我在这个答案中描述了替代方案:。
我需要按特定顺序遍历数据框并应用一些复杂的逻辑来计算新列。
此外,我强烈希望以通用方式进行操作,这样我就不必列出一行的所有列并执行 df.as[my_record]
或 case Row(...) =>
,如
下面的方法工作得很好,但我想避免指定模式两次:第一次是这样我可以在迭代时按名称访问列,第二次是处理输出。
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
val q = """
select 2 part, 1 id
union all select 2 part, 4 id
union all select 2 part, 3 id
union all select 2 part, 2 id
"""
val df = spark.sql(q)
def f_row(iter: Iterator[Row]) : Iterator[Row] = {
if (iter.hasNext) {
def complex_logic(p: Int): Integer = if (p == 3) null else p * 10;
val head = iter.next
val schema = StructType(head.schema.fields :+ StructField("result", IntegerType))
val r =
new GenericRowWithSchema((head.toSeq :+ complex_logic(head.getAs("id"))).toArray, schema)
iter.scanLeft(r)((r1, r2) =>
new GenericRowWithSchema((r2.toSeq :+ complex_logic(r2.getAs("id"))).toArray, schema)
)
} else iter
}
val schema = StructType(df.schema.fields :+ StructField("result", IntegerType))
val encoder = RowEncoder(schema)
df.repartition($"part").sortWithinPartitions($"id").mapPartitions(f_row)(encoder).show
应用 mapPartitions
后丢失了哪些信息,因此没有显式编码器无法处理输出?如何避免指定它?
What information is lost after applying mapPartitions so output cannot be processed without
信息几乎不会丢失 - 从一开始就不存在 - Row
或 InternalRow
的子类基本上是无类型的可变形状容器,不提供任何有用的类型信息, 可用于派生 Encoder
.
GenericRowWithSchema
中的 schema
无关紧要,因为它根据元数据而不是类型来描述内容。
How to avoid specifying it?
抱歉,你运气不好。如果你想在静态类型的语言中使用动态类型的结构(一包 Any
),你必须付出代价,这里提供了一个 Encoder
.
好的 - 我已经检查了我的一些 spark 代码并且将 .mapPartitions 与数据集一起使用 API 不需要我明确地 build/pass 编码器。
你需要这样的东西:
case class Before(part: Int, id: Int)
case class After(part: Int, id: Int, newCol: String)
import spark.implicits._
// Note column names/types must match case class constructor parameters.
val beforeDS = <however you obtain your input DF>.as[Before]
def f_row(it: Iterator[Before]): Iterator[After] = ???
beforeDS.reparition($"part").sortWithinPartitions($"id").mapPartitions(f_row).show
我觉得下面的解释很充分,也许对其他人有用。
mapPartitions
需要 Encoder
,否则它无法从迭代器或 Row
构造 Dataset
。即使每一行都有一个模式,该模式不能由 Dataset[U]
.
def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U] = {
new Dataset[U](
sparkSession,
MapPartitions[T, U](func, logicalPlan),
implicitly[Encoder[U]])
}
另一方面,不调用 mapPartitions
Spark 可以使用从初始查询派生的模式,因为原始列的结构(元数据)没有改变。
我在这个答案中描述了替代方案: