火花 "package.TreeNodeException" 错误 python "java.lang.RuntimeException: Couldn't find pythonUDF"
spark "package.TreeNodeException" error python "java.lang.RuntimeException: Couldn't find pythonUDF"
我在 Databricks 上使用 pySpark 2.1。
我编写了一个 UDF 来为 pyspark 数据帧的每一行生成一个唯一的 uuid。我正在使用的数据帧相对较小 < 10,000 行。并且永远不应该超过那个。
我知道有内置函数 spark 函数 zipWithIndex()
和 zipWithUniqueId()
可以生成行索引,但有人特别要求我为这个特定项目使用 uuid。
UDF udf_insert_uuid
在小型数据集上运行良好,但似乎与内置 spark 函数冲突 subtract
。
导致此错误的原因:
package.TreeNodeException: Binding attribute, tree: pythonUDF0#104830
在驱动程序堆栈错误的更深层,它还说:
Caused by: java.lang.RuntimeException: Couldn't find pythonUDF0#104830
下面是我 运行 的代码:
创建函数生成一组unique_ids
import pandas
from pyspark.sql.functions import *
from pyspark.sql.types import *
import uuid
#define a python function
def insert_uuid():
user_created_uuid = str( uuid.uuid1() )
return user_created_uuid
#register the python function for use in dataframes
udf_insert_uuid = udf(insert_uuid, StringType())
创建一个包含 50 个元素的数据框
import pandas
from pyspark.sql.functions import *
from pyspark.sql.types import *
list_of_numbers = range(1000,1050)
temp_pandasDF = pandas.DataFrame(list_of_numbers, index=None)
sparkDF = (
spark
.createDataFrame(temp_pandasDF, ["data_points"])
.withColumn("labels", when( col("data_points") < 1025, "a" ).otherwise("b")) #if "values" < 25, then "labels" = "a", else "labels" = "b"
.repartition("labels")
)
sparkDF.createOrReplaceTempView("temp_spark_table")
#add a unique id for each row
#udf works fine in the line of code here
sparkDF = sparkDF.withColumn("id", lit( udf_insert_uuid() ))
sparkDF.show(20, False)
ssparkDF 输出:
+-----------+------+------------------------------------+
|data_points|labels|id |
+-----------+------+------------------------------------+
|1029 |b |d3bb91e0-9cc8-11e7-9b70-00163e9986ba|
|1030 |b |d3bb95e6-9cc8-11e7-9b70-00163e9986ba|
|1035 |b |d3bb982a-9cc8-11e7-9b70-00163e9986ba|
|1036 |b |d3bb9a50-9cc8-11e7-9b70-00163e9986ba|
|1042 |b |d3bb9c6c-9cc8-11e7-9b70-00163e9986ba|
+-----------+------+------------------------------------+
only showing top 5 rows
创建另一个 DF,其值不同于 sparkDF
list_of_numbers = range(1025,1075)
temp_pandasDF = pandas.DataFrame(list_of_numbers, index=None)
new_DF = (
spark
.createDataFrame(temp_pandasDF, ["data_points"])
.withColumn("labels", when( col("data_points") < 1025, "a" ).otherwise("b")) #if "values" < 25, then "labels" = "a", else "labels" = "b"
.repartition("labels"))
new_DF.show(5, False)
new_DF 输出:
+-----------+------+
|data_points|labels|
+-----------+------+
|1029 |b |
|1030 |b |
|1035 |b |
|1036 |b |
|1042 |b |
+-----------+------+
only showing top 5 rows
比较 new_DF 和 spark_DF
中的值
values_not_in_new_DF = (new_DF.subtract(sparkDF.drop("id")))
将uuid添加到udf的每一行并显示
display(values_not_in_new_DF
.withColumn("id", lit( udf_insert_uuid())) #add a column of unique uuid's
)
以下错误结果:
package.TreeNodeException: Binding attribute, tree: pythonUDF0#104830
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: pythonUDF0#104830 at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56) at
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference.applyOrElse(BoundAttribute.scala:88) at
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference.applyOrElse(BoundAttribute.scala:87) at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:268) at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:268) at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:267) at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown.apply(TreeNode.scala:273) at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown.apply(TreeNode.scala:273) at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:307) at
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188) at
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:305) at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:273) at
org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:257) at
org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87) at
org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun.apply(HashAggregateExec.scala:473) at
org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun.apply(HashAggregateExec.scala:472) at
scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244) at
scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244) at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at
scala.collection.AbstractTraversable.map(Traversable.scala:105) at
org.apache.spark.sql.execution.aggregate.HashAggregateExec.generateResultCode(HashAggregateExec.scala:472) at
org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduceWithKeys(HashAggregateExec.scala:610) at
org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduce(HashAggregateExec.scala:148) at
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce.apply(WholeStageCodegenExec.scala:83) at
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce.apply(WholeStageCodegenExec.scala:78) at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:135) at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132) at
org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78) at
org.apache.spark.sql.execution.aggregate.HashAggregateExec.produce(HashAggregateExec.scala:38) at
org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:313) at
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:354) at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:114) at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:114) at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:135) at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132) at
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113) at
org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:225) at
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:308) at
org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) at
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2807) at
org.apache.spark.sql.Dataset$$anonfun$head.apply(Dataset.scala:2132) at
org.apache.spark.sql.Dataset$$anonfun$head.apply(Dataset.scala:2132) at
org.apache.spark.sql.Dataset$$anonfun.apply(Dataset.scala:2791) at
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId.apply(SQLExecution.scala:87) at
org.apache.spark.sql.execution.SQLExecution$.withFileAccessAudit(SQLExecution.scala:53) at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:70) at
org.apache.spark.sql.Dataset.withAction(Dataset.scala:2790) at
org.apache.spark.sql.Dataset.head(Dataset.scala:2132) at
org.apache.spark.sql.Dataset.take(Dataset.scala:2345) at
com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation0(OutputAggregator.scala:81) at
com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation(OutputAggregator.scala:42) at
com.databricks.backend.daemon.driver.PythonDriverLocal$$anonfun$getResultBuffer.apply(PythonDriverLocal.scala:461) at
com.databricks.backend.daemon.driver.PythonDriverLocal$$anonfun$getResultBuffer.apply(PythonDriverLocal.scala:441) at
com.databricks.backend.daemon.driver.PythonDriverLocal.withInterpLock(PythonDriverLocal.scala:394) at
com.databricks.backend.daemon.driver.PythonDriverLocal.getResultBuffer(PythonDriverLocal.scala:441) at
com.databricks.backend.daemon.driver.PythonDriverLocal.com$databricks$backend$daemon$driver$PythonDriverLocal$$outputSuccess(PythonDriverLocal.scala:428) at
com.databricks.backend.daemon.driver.PythonDriverLocal$$anonfun$repl.apply(PythonDriverLocal.scala:178) at
com.databricks.backend.daemon.driver.PythonDriverLocal$$anonfun$repl.apply(PythonDriverLocal.scala:175) at
com.databricks.backend.daemon.driver.PythonDriverLocal.withInterpLock(PythonDriverLocal.scala:394) at
com.databricks.backend.daemon.driver.PythonDriverLocal.repl(PythonDriverLocal.scala:175) at
com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute.apply(DriverLocal.scala:230) at
com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute.apply(DriverLocal.scala:211) at
com.databricks.logging.UsageLogging$$anonfun$withAttributionContext.apply(UsageLogging.scala:173) at
scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at
com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:168) at
com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:39) at
com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:206) at
com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:39) at
com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:211) at
com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand.apply(DriverWrapper.scala:589) at
com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand.apply(DriverWrapper.scala:589) at
scala.util.Try$.apply(Try.scala:161) at
com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:584) at
com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:488) at
com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:391) at
com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:348) at
com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:215) at
java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Couldn't find pythonUDF0#104830 in [data_points#104799L,labels#104802] at
scala.sys.package$.error(package.scala:27) at
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$$anonfun$applyOrElse.apply(BoundAttribute.scala:94) at
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$$anonfun$applyOrElse.apply(BoundAttribute.scala:88) at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) ... 82 more
当我 运行 你的脚本时,我得到了和你一样的错误。我发现让它工作的唯一方法是传递 UDF
一列而不是没有参数:
def insert_uuid(col):
user_created_uuid = str( uuid.uuid1() )
return user_created_uuid
udf_insert_uuid = udf(insert_uuid, StringType())
然后在 labels
上调用它,例如:
values_not_in_new_DF\
.withColumn("id", udf_insert_uuid("labels"))\
.show()
不需要使用lit
我在 Databricks 上使用 pySpark 2.1。
我编写了一个 UDF 来为 pyspark 数据帧的每一行生成一个唯一的 uuid。我正在使用的数据帧相对较小 < 10,000 行。并且永远不应该超过那个。
我知道有内置函数 spark 函数 zipWithIndex()
和 zipWithUniqueId()
可以生成行索引,但有人特别要求我为这个特定项目使用 uuid。
UDF udf_insert_uuid
在小型数据集上运行良好,但似乎与内置 spark 函数冲突 subtract
。
导致此错误的原因:
package.TreeNodeException: Binding attribute, tree: pythonUDF0#104830
在驱动程序堆栈错误的更深层,它还说:
Caused by: java.lang.RuntimeException: Couldn't find pythonUDF0#104830
下面是我 运行 的代码:
创建函数生成一组unique_ids
import pandas
from pyspark.sql.functions import *
from pyspark.sql.types import *
import uuid
#define a python function
def insert_uuid():
user_created_uuid = str( uuid.uuid1() )
return user_created_uuid
#register the python function for use in dataframes
udf_insert_uuid = udf(insert_uuid, StringType())
创建一个包含 50 个元素的数据框
import pandas
from pyspark.sql.functions import *
from pyspark.sql.types import *
list_of_numbers = range(1000,1050)
temp_pandasDF = pandas.DataFrame(list_of_numbers, index=None)
sparkDF = (
spark
.createDataFrame(temp_pandasDF, ["data_points"])
.withColumn("labels", when( col("data_points") < 1025, "a" ).otherwise("b")) #if "values" < 25, then "labels" = "a", else "labels" = "b"
.repartition("labels")
)
sparkDF.createOrReplaceTempView("temp_spark_table")
#add a unique id for each row
#udf works fine in the line of code here
sparkDF = sparkDF.withColumn("id", lit( udf_insert_uuid() ))
sparkDF.show(20, False)
ssparkDF 输出:
+-----------+------+------------------------------------+
|data_points|labels|id |
+-----------+------+------------------------------------+
|1029 |b |d3bb91e0-9cc8-11e7-9b70-00163e9986ba|
|1030 |b |d3bb95e6-9cc8-11e7-9b70-00163e9986ba|
|1035 |b |d3bb982a-9cc8-11e7-9b70-00163e9986ba|
|1036 |b |d3bb9a50-9cc8-11e7-9b70-00163e9986ba|
|1042 |b |d3bb9c6c-9cc8-11e7-9b70-00163e9986ba|
+-----------+------+------------------------------------+
only showing top 5 rows
创建另一个 DF,其值不同于 sparkDF
list_of_numbers = range(1025,1075)
temp_pandasDF = pandas.DataFrame(list_of_numbers, index=None)
new_DF = (
spark
.createDataFrame(temp_pandasDF, ["data_points"])
.withColumn("labels", when( col("data_points") < 1025, "a" ).otherwise("b")) #if "values" < 25, then "labels" = "a", else "labels" = "b"
.repartition("labels"))
new_DF.show(5, False)
new_DF 输出:
+-----------+------+
|data_points|labels|
+-----------+------+
|1029 |b |
|1030 |b |
|1035 |b |
|1036 |b |
|1042 |b |
+-----------+------+
only showing top 5 rows
比较 new_DF 和 spark_DF
中的值values_not_in_new_DF = (new_DF.subtract(sparkDF.drop("id")))
将uuid添加到udf的每一行并显示
display(values_not_in_new_DF
.withColumn("id", lit( udf_insert_uuid())) #add a column of unique uuid's
)
以下错误结果:
package.TreeNodeException: Binding attribute, tree: pythonUDF0#104830 org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: pythonUDF0#104830 at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56) at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference.applyOrElse(BoundAttribute.scala:88) at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference.applyOrElse(BoundAttribute.scala:87) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:268) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:268) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:267) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown.apply(TreeNode.scala:273) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown.apply(TreeNode.scala:273) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:307) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:305) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:273) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:257) at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87) at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun.apply(HashAggregateExec.scala:473) at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun.apply(HashAggregateExec.scala:472) at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.execution.aggregate.HashAggregateExec.generateResultCode(HashAggregateExec.scala:472) at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduceWithKeys(HashAggregateExec.scala:610) at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduce(HashAggregateExec.scala:148) at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce.apply(WholeStageCodegenExec.scala:83) at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce.apply(WholeStageCodegenExec.scala:78) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:135) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132) at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78) at org.apache.spark.sql.execution.aggregate.HashAggregateExec.produce(HashAggregateExec.scala:38) at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:313) at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:354) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:114) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:114) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:135) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113) at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:225) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:308) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2807) at org.apache.spark.sql.Dataset$$anonfun$head.apply(Dataset.scala:2132) at org.apache.spark.sql.Dataset$$anonfun$head.apply(Dataset.scala:2132) at org.apache.spark.sql.Dataset$$anonfun.apply(Dataset.scala:2791) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId.apply(SQLExecution.scala:87) at org.apache.spark.sql.execution.SQLExecution$.withFileAccessAudit(SQLExecution.scala:53) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:70) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2790) at org.apache.spark.sql.Dataset.head(Dataset.scala:2132) at org.apache.spark.sql.Dataset.take(Dataset.scala:2345) at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation0(OutputAggregator.scala:81) at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation(OutputAggregator.scala:42) at com.databricks.backend.daemon.driver.PythonDriverLocal$$anonfun$getResultBuffer.apply(PythonDriverLocal.scala:461) at com.databricks.backend.daemon.driver.PythonDriverLocal$$anonfun$getResultBuffer.apply(PythonDriverLocal.scala:441) at com.databricks.backend.daemon.driver.PythonDriverLocal.withInterpLock(PythonDriverLocal.scala:394) at com.databricks.backend.daemon.driver.PythonDriverLocal.getResultBuffer(PythonDriverLocal.scala:441) at com.databricks.backend.daemon.driver.PythonDriverLocal.com$databricks$backend$daemon$driver$PythonDriverLocal$$outputSuccess(PythonDriverLocal.scala:428) at com.databricks.backend.daemon.driver.PythonDriverLocal$$anonfun$repl.apply(PythonDriverLocal.scala:178) at com.databricks.backend.daemon.driver.PythonDriverLocal$$anonfun$repl.apply(PythonDriverLocal.scala:175) at com.databricks.backend.daemon.driver.PythonDriverLocal.withInterpLock(PythonDriverLocal.scala:394) at com.databricks.backend.daemon.driver.PythonDriverLocal.repl(PythonDriverLocal.scala:175) at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute.apply(DriverLocal.scala:230) at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute.apply(DriverLocal.scala:211) at com.databricks.logging.UsageLogging$$anonfun$withAttributionContext.apply(UsageLogging.scala:173) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:168) at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:39) at com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:206) at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:39) at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:211) at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand.apply(DriverWrapper.scala:589) at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand.apply(DriverWrapper.scala:589) at scala.util.Try$.apply(Try.scala:161) at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:584) at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:488) at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:391) at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:348) at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:215) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Couldn't find pythonUDF0#104830 in [data_points#104799L,labels#104802] at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$$anonfun$applyOrElse.apply(BoundAttribute.scala:94) at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$$anonfun$applyOrElse.apply(BoundAttribute.scala:88) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) ... 82 more
当我 运行 你的脚本时,我得到了和你一样的错误。我发现让它工作的唯一方法是传递 UDF
一列而不是没有参数:
def insert_uuid(col):
user_created_uuid = str( uuid.uuid1() )
return user_created_uuid
udf_insert_uuid = udf(insert_uuid, StringType())
然后在 labels
上调用它,例如:
values_not_in_new_DF\
.withColumn("id", udf_insert_uuid("labels"))\
.show()
不需要使用lit