无法调用pyspark udf函数

unable to call pyspark udf function

尝试使用 UDF 函数,但出现错误:

import time
import datetime
from pyspark.sql.functions import lit,unix_timestamp, udf, col, lit
from pyspark.sql.types import TimestampType, DecimalType

dict = [{'name': 'Alice', 'age': 1},{'name': 'Again', 'age': 2}]
df = spark.createDataFrame(dict)

timestamp1 = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
timestamp2 = datetime.datetime.fromtimestamp(time.time() + 90).strftime('%Y-%m-%d %H:%M:%S')


def calc_time(start, end):
    timefmt = "yyyy-MM-dd'T'HH:mm:ss"
    return unix_timestamp(end, format=timefmt) - unix_timestamp(start, format=timefmt)


calc_time_udf = udf(lambda start, end: calc_time(start, end), TimestampType())

new_df = (df.withColumn('time1', unix_timestamp(lit(timestamp1),'yyyy-MM-dd HH:mm:ss').cast("timestamp"))
          .withColumn('time2', unix_timestamp(lit(timestamp2),'yyyy-MM-dd HH:mm:ss').cast("timestamp")))


new_df.withColumn("DIFF", calc_time_udf(col("time1"), col("time2")).cast(DecimalType(20, 6))).show()

错误堆栈跟踪:

文件“/opt/cloudera/parcels/CDH-6.3.4-1.cdh6.3.4.p0.6626826/lib/spark/python/pyspark/sql/functions.py”,第 1253 行,在 unix_timestamp return列(sc._jvm.functions.unix_timestamp(_to_java_column(时间戳),格式)) AttributeError: 'NoneType' 对象没有属性 '_jvm'

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon.read(PythonUDFRunner.scala:81)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon.read(PythonUDFRunner.scala:64)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:624)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:255)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:858)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:858)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:407)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

您可能想试试 this approach:

from pyspark.sql.functions import lit,unix_timestamp, udf, col, lit
from pyspark.sql.types import DoubleType, DecimalType
from pyspark.sql.functions import pandas_udf

@pandas_udf(DoubleType())
def ts_diff(start, end):
    return (end - start).dt.total_seconds()

然后使用您问题中的 new_df

>>> new_df.withColumn("DIFF", ts_diff("time1", "time2")).show()
+---+-----+-------------------+-------------------+----+
|age| name|              time1|              time2|DIFF|
+---+-----+-------------------+-------------------+----+
|  1|Alice|2021-07-25 17:21:58|2021-07-25 17:23:36|98.0|
|  2|Again|2021-07-25 17:21:58|2021-07-25 17:23:36|98.0|
+---+-----+-------------------+-------------------+----+