UDF 函数如何在 pyspark 中以日期作为参数工作?

How UDF function works in pyspark with dates as arguments?

我前段时间开始使用 pyspark 世界,我正在用一种算法绞尽脑汁,最初我想创建一个函数来计算两个日期之间的月差,我知道有一个函数可以做到这一点(months_between),但它的工作原理与我想要的有点不同,我想从两个日期中提取月份并减去而不考虑天数,只考虑月份和年份,重点是,我可以通过操作基数、用月份创建新列并减去来做到这一点,但我想将其作为 UDF 函数来执行,如下所示:

from datetime import datetime
import pyspark.sql.functions as f

base_study = spark.createDataFrame([("1", "2009-01-31", "2007-01-31"),("2","2009-01-31","2011-01-31")], ['ID', 'A', 'B'])
base_study = base_study.withColumn("A",f.to_date(base_study["A"], 'yyyy-MM-dd'))
base_study = base_study.withColumn("B",f.to_date(base_study["B"], 'yyyy-MM-dd'))


def intckSasFunc(RecentDate, PreviousDate):
    RecentDate = f.month("RecentDate")
    PreviousDate = f.month("PreviousDate")
    months_diff = (RecentDate.year - PreviousDate.year) * 12 + (RecentDate.month - PreviousDate.month)
    return months_diff
  
intckSasFuncUDF = f.udf(intckSasFunc, IntegerType())

base_study.withColumn('Result', intckSasFuncUDF(f.col('B'), f.col('A') ))

我做错了什么?

另一个问题:当我在 UDF 函数中传递参数时,它们是一个一个发送还是传递整个列?这个专栏是一个系列吗?

谢谢!

我找到了一个解决方案并将其升级以处理缺失。

from datetime import datetime
import pyspark.sql.functions as f

base_study = spark.createDataFrame([("1", None, "2015-01-01"),("2","2015-01-31","2015-01-31")], ['ID', 'A', 'B'])
base_study = base_study.withColumn("A",f.to_date(base_study["A"], 'yyyy-MM-dd'))
base_study = base_study.withColumn("B",f.to_date(base_study["B"], 'yyyy-MM-dd'))


def intckSasFunc(RecentDate, PreviousDate):

  if (PreviousDate and RecentDate) is not None:
    months_diff = (RecentDate.year - PreviousDate.year) * 12 + (RecentDate.month - PreviousDate.month)
    return months_diff
  else:
    return None 

intckSasFuncUDF = f.udf(lambda x,y:intckSasFunc(x,y) , IntegerType())

display(base_study.withColumn('Result', intckSasFuncUDF(f.col('B'), f.col('A'))))

对于那些有疑问的人,就像我一样,该函数一次处理一条记录,就好像它是一个普通的 python 函数一样,我不能在这个里面使用 pyspark.sql 函数UDF,报错,貌似这些函数只用在pypsark的列中,UDF内部是逐行转换的。