无法调用pyspark udf函数
unable to call pyspark udf function
尝试使用 UDF 函数,但出现错误:
import time
import datetime
from pyspark.sql.functions import lit,unix_timestamp, udf, col, lit
from pyspark.sql.types import TimestampType, DecimalType
dict = [{'name': 'Alice', 'age': 1},{'name': 'Again', 'age': 2}]
df = spark.createDataFrame(dict)
timestamp1 = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
timestamp2 = datetime.datetime.fromtimestamp(time.time() + 90).strftime('%Y-%m-%d %H:%M:%S')
def calc_time(start, end):
timefmt = "yyyy-MM-dd'T'HH:mm:ss"
return unix_timestamp(end, format=timefmt) - unix_timestamp(start, format=timefmt)
calc_time_udf = udf(lambda start, end: calc_time(start, end), TimestampType())
new_df = (df.withColumn('time1', unix_timestamp(lit(timestamp1),'yyyy-MM-dd HH:mm:ss').cast("timestamp"))
.withColumn('time2', unix_timestamp(lit(timestamp2),'yyyy-MM-dd HH:mm:ss').cast("timestamp")))
new_df.withColumn("DIFF", calc_time_udf(col("time1"), col("time2")).cast(DecimalType(20, 6))).show()
错误堆栈跟踪:
文件“/opt/cloudera/parcels/CDH-6.3.4-1.cdh6.3.4.p0.6626826/lib/spark/python/pyspark/sql/functions.py”,第 1253 行,在 unix_timestamp
return列(sc._jvm.functions.unix_timestamp(_to_java_column(时间戳),格式))
AttributeError: 'NoneType' 对象没有属性 '_jvm'
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon.read(PythonUDFRunner.scala:81)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon.read(PythonUDFRunner.scala:64)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:624)
at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:255)
at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:858)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:858)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:407)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
您可能想试试 this approach:
from pyspark.sql.functions import lit,unix_timestamp, udf, col, lit
from pyspark.sql.types import DoubleType, DecimalType
from pyspark.sql.functions import pandas_udf
@pandas_udf(DoubleType())
def ts_diff(start, end):
return (end - start).dt.total_seconds()
然后使用您问题中的 new_df
:
>>> new_df.withColumn("DIFF", ts_diff("time1", "time2")).show()
+---+-----+-------------------+-------------------+----+
|age| name| time1| time2|DIFF|
+---+-----+-------------------+-------------------+----+
| 1|Alice|2021-07-25 17:21:58|2021-07-25 17:23:36|98.0|
| 2|Again|2021-07-25 17:21:58|2021-07-25 17:23:36|98.0|
+---+-----+-------------------+-------------------+----+
尝试使用 UDF 函数,但出现错误:
import time
import datetime
from pyspark.sql.functions import lit,unix_timestamp, udf, col, lit
from pyspark.sql.types import TimestampType, DecimalType
dict = [{'name': 'Alice', 'age': 1},{'name': 'Again', 'age': 2}]
df = spark.createDataFrame(dict)
timestamp1 = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
timestamp2 = datetime.datetime.fromtimestamp(time.time() + 90).strftime('%Y-%m-%d %H:%M:%S')
def calc_time(start, end):
timefmt = "yyyy-MM-dd'T'HH:mm:ss"
return unix_timestamp(end, format=timefmt) - unix_timestamp(start, format=timefmt)
calc_time_udf = udf(lambda start, end: calc_time(start, end), TimestampType())
new_df = (df.withColumn('time1', unix_timestamp(lit(timestamp1),'yyyy-MM-dd HH:mm:ss').cast("timestamp"))
.withColumn('time2', unix_timestamp(lit(timestamp2),'yyyy-MM-dd HH:mm:ss').cast("timestamp")))
new_df.withColumn("DIFF", calc_time_udf(col("time1"), col("time2")).cast(DecimalType(20, 6))).show()
错误堆栈跟踪:
文件“/opt/cloudera/parcels/CDH-6.3.4-1.cdh6.3.4.p0.6626826/lib/spark/python/pyspark/sql/functions.py”,第 1253 行,在 unix_timestamp return列(sc._jvm.functions.unix_timestamp(_to_java_column(时间戳),格式)) AttributeError: 'NoneType' 对象没有属性 '_jvm'
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon.read(PythonUDFRunner.scala:81)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon.read(PythonUDFRunner.scala:64)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:624)
at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:255)
at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:858)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:858)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:407)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
您可能想试试 this approach:
from pyspark.sql.functions import lit,unix_timestamp, udf, col, lit
from pyspark.sql.types import DoubleType, DecimalType
from pyspark.sql.functions import pandas_udf
@pandas_udf(DoubleType())
def ts_diff(start, end):
return (end - start).dt.total_seconds()
然后使用您问题中的 new_df
:
>>> new_df.withColumn("DIFF", ts_diff("time1", "time2")).show()
+---+-----+-------------------+-------------------+----+
|age| name| time1| time2|DIFF|
+---+-----+-------------------+-------------------+----+
| 1|Alice|2021-07-25 17:21:58|2021-07-25 17:23:36|98.0|
| 2|Again|2021-07-25 17:21:58|2021-07-25 17:23:36|98.0|
+---+-----+-------------------+-------------------+----+