多重连接后 Spark 数据集无法解析列
Spark dataset failed to resolve column after multiple join
假设我有这些情况class
case class Employee(id: Long, proj_id: Long, office_id: Long, salary: Long)
case class Renumeration(id: Long, amount: Long)
我想使用 Spark
基于 Renumeration
更新 Employee
的集合
val right: Dataset[Renumeration] = ???
val left: Dataset[Employee] = ???
left.joinWith(broadcast(right),left("proj_id") === right("id"),"leftouter")
.map { case(left,right) => updateProj(left,right) }
.joinWith(broadcast(right),left("office_id") === right("id"),"leftouter")
.map { case(left,right) => updateOffice(left,right) }
def updateProj(emp: Employee; ren: Renumeration): Employee = //business logic
def updateOffice(emp: Employee; ren: Renumeration): Employee = //business logic
第一个 join
和 map
有效,但是当我介绍第二个 join
时,Spark 未能解析 id
列,而是显示了这些列。
org.apache.spark.sql.AnalysisException: Resolved attribute(s) office_id#42L missing from id#114L,salary#117L,id#34L,amount#35L,proj_id#115L,office_id#116L in operator !Join LeftOuter, (office_id#42L = id#34L). Attribute(s) with the same name appear in the operation: office_id. Please check if the right attribute(s) are used.;;
!Join LeftOuter, (office_id#42L = id#34L)
:- SerializeFromObject [assertnotnull(assertnotnull(input[0, Employee, true])).id AS id#114L, assertnotnull(assertnotnull(input[0, Employee, true])).proj_id AS proj_id#115L, assertnotnull(assertnotnull(input[0, Employee, true])).office_id AS office_id#116L, assertnotnull(assertnotnull(input[0, Employee, true])).salary AS salary#117L]
: +- MapElements <function1>, class scala.Tuple2, [StructField(_1,StructType(StructField(id,LongType,false), StructField(proj_id,LongType,false), StructField(office_id,LongType,false), StructField(salary,LongType,false)),true), StructField(_2,StructType(StructField(id,LongType,false), StructField(amount,LongType,false)),true)], obj#113: Employee
: +- DeserializeToObject newInstance(class scala.Tuple2), obj#112: scala.Tuple2
: +- Join LeftOuter, (_1#103.proj_id = _2#104.id)
: :- Project [named_struct(id, id#40L, proj_id, proj_id#41L, office_id, office_id#42L, salary, salary#43L) AS _1#103]
: : +- LocalRelation <empty>, [id#40L, proj_id#41L, office_id#42L, salary#43L]
: +- Project [named_struct(id, id#34L, amount, amount#35L) AS _2#104]
: +- ResolvedHint (broadcast)
: +- LocalRelation <empty>, [id#34L, amount#35L]
+- ResolvedHint (broadcast)
+- LocalRelation <empty>, [id#34L, amount#35L]
知道为什么即使我已经使用了类型 Dataset
Spark 也无法解析该列吗?如果可能的话,我应该怎么做才能使这项工作正常进行?
导致错误的原因是 left("office_id")
返回的引用不再存在于新的投影数据集(即第一次连接和映射操作产生的数据集)中。
如果仔细观察嵌套关系中的执行计划
: +- LocalRelation <empty>, [id#40L, proj_id#41L, office_id#42L, salary#43L]
您可以观察到 left
数据集中对 office_id
的引用是 office_id#42L
。但是,如果你查看后面的执行,你会注意到投影中不再存在这个引用
SerializeFromObject [assertnotnull(assertnotnull(input[0, Employee, true])).id AS id#114L, assertnotnull(assertnotnull(input[0, Employee, true])).proj_id AS proj_id#115L, assertnotnull(assertnotnull(input[0, Employee, true])).office_id AS office_id#116L, assertnotnull(assertnotnull(input[0, Employee, true])).salary AS salary#117L]
因为可用的 office_id
参考是 office_id#116L
。
为了解决这个问题,您可以使用 intermediary/temporary 变量,例如:
val right: Dataset[Renumeration] = ???
val left: Dataset[Employee] = ???
val leftTemp = left.joinWith(broadcast(right),left("proj_id") === right("id"),"leftouter")
.map { case(left,right) => updateProj(left,right) }
val leftFinal = leftTemp.joinWith(broadcast(right),leftTemp("office_id") === right("id"),"leftouter")
.map { case(left,right) => updateOffice(left,right) }
或者您可以尝试在您的加入中使用以下 shorthand $"office_id" === right("id")
例如
left.joinWith(broadcast(right),left("proj_id") === right("id"),"leftouter")
.map { case(left,right) => updateProj(left,right) }
.joinWith(broadcast(right),$"office_id" === right("id"),"leftouter")
.map { case(left,right) => updateOffice(left,right) }
让我知道这是否适合你。
假设我有这些情况class
case class Employee(id: Long, proj_id: Long, office_id: Long, salary: Long)
case class Renumeration(id: Long, amount: Long)
我想使用 Spark
基于Renumeration
更新 Employee
的集合
val right: Dataset[Renumeration] = ???
val left: Dataset[Employee] = ???
left.joinWith(broadcast(right),left("proj_id") === right("id"),"leftouter")
.map { case(left,right) => updateProj(left,right) }
.joinWith(broadcast(right),left("office_id") === right("id"),"leftouter")
.map { case(left,right) => updateOffice(left,right) }
def updateProj(emp: Employee; ren: Renumeration): Employee = //business logic
def updateOffice(emp: Employee; ren: Renumeration): Employee = //business logic
第一个 join
和 map
有效,但是当我介绍第二个 join
时,Spark 未能解析 id
列,而是显示了这些列。
org.apache.spark.sql.AnalysisException: Resolved attribute(s) office_id#42L missing from id#114L,salary#117L,id#34L,amount#35L,proj_id#115L,office_id#116L in operator !Join LeftOuter, (office_id#42L = id#34L). Attribute(s) with the same name appear in the operation: office_id. Please check if the right attribute(s) are used.;;
!Join LeftOuter, (office_id#42L = id#34L)
:- SerializeFromObject [assertnotnull(assertnotnull(input[0, Employee, true])).id AS id#114L, assertnotnull(assertnotnull(input[0, Employee, true])).proj_id AS proj_id#115L, assertnotnull(assertnotnull(input[0, Employee, true])).office_id AS office_id#116L, assertnotnull(assertnotnull(input[0, Employee, true])).salary AS salary#117L]
: +- MapElements <function1>, class scala.Tuple2, [StructField(_1,StructType(StructField(id,LongType,false), StructField(proj_id,LongType,false), StructField(office_id,LongType,false), StructField(salary,LongType,false)),true), StructField(_2,StructType(StructField(id,LongType,false), StructField(amount,LongType,false)),true)], obj#113: Employee
: +- DeserializeToObject newInstance(class scala.Tuple2), obj#112: scala.Tuple2
: +- Join LeftOuter, (_1#103.proj_id = _2#104.id)
: :- Project [named_struct(id, id#40L, proj_id, proj_id#41L, office_id, office_id#42L, salary, salary#43L) AS _1#103]
: : +- LocalRelation <empty>, [id#40L, proj_id#41L, office_id#42L, salary#43L]
: +- Project [named_struct(id, id#34L, amount, amount#35L) AS _2#104]
: +- ResolvedHint (broadcast)
: +- LocalRelation <empty>, [id#34L, amount#35L]
+- ResolvedHint (broadcast)
+- LocalRelation <empty>, [id#34L, amount#35L]
知道为什么即使我已经使用了类型 Dataset
Spark 也无法解析该列吗?如果可能的话,我应该怎么做才能使这项工作正常进行?
导致错误的原因是 left("office_id")
返回的引用不再存在于新的投影数据集(即第一次连接和映射操作产生的数据集)中。
如果仔细观察嵌套关系中的执行计划
: +- LocalRelation <empty>, [id#40L, proj_id#41L, office_id#42L, salary#43L]
您可以观察到 left
数据集中对 office_id
的引用是 office_id#42L
。但是,如果你查看后面的执行,你会注意到投影中不再存在这个引用
SerializeFromObject [assertnotnull(assertnotnull(input[0, Employee, true])).id AS id#114L, assertnotnull(assertnotnull(input[0, Employee, true])).proj_id AS proj_id#115L, assertnotnull(assertnotnull(input[0, Employee, true])).office_id AS office_id#116L, assertnotnull(assertnotnull(input[0, Employee, true])).salary AS salary#117L]
因为可用的 office_id
参考是 office_id#116L
。
为了解决这个问题,您可以使用 intermediary/temporary 变量,例如:
val right: Dataset[Renumeration] = ???
val left: Dataset[Employee] = ???
val leftTemp = left.joinWith(broadcast(right),left("proj_id") === right("id"),"leftouter")
.map { case(left,right) => updateProj(left,right) }
val leftFinal = leftTemp.joinWith(broadcast(right),leftTemp("office_id") === right("id"),"leftouter")
.map { case(left,right) => updateOffice(left,right) }
或者您可以尝试在您的加入中使用以下 shorthand $"office_id" === right("id")
例如
left.joinWith(broadcast(right),left("proj_id") === right("id"),"leftouter")
.map { case(left,right) => updateProj(left,right) }
.joinWith(broadcast(right),$"office_id" === right("id"),"leftouter")
.map { case(left,right) => updateOffice(left,right) }
让我知道这是否适合你。