使用 udf 和递归在数据框中创建新列
Create new column in dataframe with udf and recursion
我有两个 parquet 文件,一个用 inode 编号描述文件,一个描述 inode 名称和父 inode,我需要从第二个文件重建完整路径。
我的 table 和 inode 的描述被命名为 idirs_table_read
并且格式如下(这是一个完整的例子):
iparent,iname,ichild
93767723,folder12,40715069
65688175,level4,93767723
80373386,name,65688175
22746413,level2,80373386
24,base,22746413
我想要一个 inode 编号能够重建文件路径。
例如对于 inode 93767723,路径是:/base/level2/name/level4
我定义了两个函数(一个递归函数,另一个过程函数)这两个函数在像这样使用时有效 newPathRecursive(1236549)
但在 withColumns
中使用时失败:
def newPathRecursive( inumber : Int ):String = {
var composite = ""
var result = idirs_table_read.select("iparent", "iname").filter($"ichild"===inumber)
if ( (result.count() == 1) && (result.first()(0) != inumber) ) {
var num= result.first()(0).asInstanceOf[Int]
composite= newPath(num) + "/" + result.first()(1).asInstanceOf[String]
}
return composite
}
def newPathProcedurale (inumber : Int ):String = {
var composite = ""
var go = true
var parentInode=inumber
while(go){
var result = idirs_table_read.select("iparent", "iname").filter($"ichild"===parentInode)
if ( (result.count() == 1) && (result.first()(0) != inumber) ) {
println(result.first()(0)+","+ result.first()(1)+","+ parentInode)
parentInode = result.first()(0).asInstanceOf[Int]
composite= "/" + result.first()(1).asInstanceOf[String] + composite
}else{
go=false
}
}
return composite.asInstanceOf[String]
}
val buildpath2 = udf[String, Int](newPath2)
val buildpath = udf[String, Int](newPath)
我的目标是能够通过此路径替换另一个 table 中的 inode 编号,但是当我尝试在 select 中使用该函数时,我得到类似的东西:
df.withColumn("newcol",buildpath ( $"inumber" )
Caused by: java.lang.reflect.InvocationTargetException: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1521.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1521.0 (TID 14859, ip.ip.ip.ip, executor 1): org.apache.spark.SparkException: Failed to execute user defined function($anonfun: (int) => string)
能否请您帮助我处理这段代码,并最终建议我更好地实现该算法并更好地使用它?
我的目标只是用完整路径而不是 inode(这不是人类可读的)从这两个文件构建一个新的 parquet 文件
由于 post 提到的重复项实际上与我的重复项不同,我不理解解释,因为真正的答案是在评论中而不是在实际答案中,这是答案:
如果 udf 内部使用的数据集不是本地的,我想做的事情是不可能的:
inodes_table_read.isLocal
false
所以我要使用其他东西,我会post解释。
我有两个 parquet 文件,一个用 inode 编号描述文件,一个描述 inode 名称和父 inode,我需要从第二个文件重建完整路径。
我的 table 和 inode 的描述被命名为 idirs_table_read
并且格式如下(这是一个完整的例子):
iparent,iname,ichild
93767723,folder12,40715069
65688175,level4,93767723
80373386,name,65688175
22746413,level2,80373386
24,base,22746413
我想要一个 inode 编号能够重建文件路径。
例如对于 inode 93767723,路径是:/base/level2/name/level4
我定义了两个函数(一个递归函数,另一个过程函数)这两个函数在像这样使用时有效 newPathRecursive(1236549)
但在 withColumns
中使用时失败:
def newPathRecursive( inumber : Int ):String = {
var composite = ""
var result = idirs_table_read.select("iparent", "iname").filter($"ichild"===inumber)
if ( (result.count() == 1) && (result.first()(0) != inumber) ) {
var num= result.first()(0).asInstanceOf[Int]
composite= newPath(num) + "/" + result.first()(1).asInstanceOf[String]
}
return composite
}
def newPathProcedurale (inumber : Int ):String = {
var composite = ""
var go = true
var parentInode=inumber
while(go){
var result = idirs_table_read.select("iparent", "iname").filter($"ichild"===parentInode)
if ( (result.count() == 1) && (result.first()(0) != inumber) ) {
println(result.first()(0)+","+ result.first()(1)+","+ parentInode)
parentInode = result.first()(0).asInstanceOf[Int]
composite= "/" + result.first()(1).asInstanceOf[String] + composite
}else{
go=false
}
}
return composite.asInstanceOf[String]
}
val buildpath2 = udf[String, Int](newPath2)
val buildpath = udf[String, Int](newPath)
我的目标是能够通过此路径替换另一个 table 中的 inode 编号,但是当我尝试在 select 中使用该函数时,我得到类似的东西:
df.withColumn("newcol",buildpath ( $"inumber" )
Caused by: java.lang.reflect.InvocationTargetException: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1521.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1521.0 (TID 14859, ip.ip.ip.ip, executor 1): org.apache.spark.SparkException: Failed to execute user defined function($anonfun: (int) => string)
能否请您帮助我处理这段代码,并最终建议我更好地实现该算法并更好地使用它?
我的目标只是用完整路径而不是 inode(这不是人类可读的)从这两个文件构建一个新的 parquet 文件
由于 post 提到的重复项实际上与我的重复项不同,我不理解解释,因为真正的答案是在评论中而不是在实际答案中,这是答案:
如果 udf 内部使用的数据集不是本地的,我想做的事情是不可能的:
inodes_table_read.isLocal
false
所以我要使用其他东西,我会post解释。