如何计算 Pyspark 2.2.0 中不包括周末的日期之间的差异

How to calculate difference between dates excluding weekends in Pyspark 2.2.0

我有下面的 pyspark df,可以通过代码重新创建

df = spark.createDataFrame([(1, "John Doe", "2020-11-30"),(2, "John Doe", "2020-11-27"),(3, "John Doe", "2020-11-29")],
                            ("id", "name", "date")) 

   +---+--------+----------+
| id|    name|      date|
+---+--------+----------+
|  1|John Doe|2020-11-30|
|  2|John Doe|2020-11-27|
|  3|John Doe|2020-11-29|
+---+--------+----------+

我想创建一个 udf 来计算 2 行日期(使用滞后函数)之间的差异,不包括周末,因为 pyspark 2.2.0 没有这样做的内置函数。例如。 2020-11-30 和 2020-11-27 之间的差异应该为 1,因为它们分别是星期一和星期五。

我试图在 的帮助下创建以下内容:

from pyspark.sql.functions import udf
import numpy as np
workdaUDF = udf(lambda z: workdays(z),IntegerType())
def workdays():
date1 = df.select(F.col('date')).collect()[1][0]
date2 = df.select(F.col('date')).collect()[0][0]
date_diff = np.busday_count(date1,date2)
return date_diff

df.withColumn("date_dif",workdaysUDF(F.col("date"))).show(truncate=False)

但是我得到以下错误

PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

任何关于如何在我的数据框的每一行上进行这项工作的帮助都会非常有帮助。

PS :我的 date1 和 date2 变量需要是动态的,具体取决于应用函数的日期值。另外,由于数据帧的大小,我无法使用 pandas,我找到了多个解决方案。

提前致谢。

您不能在 UDF 中调用 collect。 UDF只能传入columns,所以应该传入date列和lag date列,如下图:

import numpy as np
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType

df = spark.createDataFrame([
    (1, "John Doe", "2020-11-30"),
    (2, "John Doe", "2020-11-27"),
    (3, "John Doe", "2020-11-29")],
    ("id", "name", "date")
) 

workdaysUDF = F.udf(lambda date1, date2: int(np.busday_count(date2, date1)) if (date1 is not None and date2 is not None) else None, IntegerType())
df = df.withColumn("date_dif", workdaysUDF(F.col('date'), F.lag(F.col('date')).over(Window.partitionBy('name').orderBy('id'))))
df.show()

+---+--------+----------+--------+
| id|    name|      date|date_dif|
+---+--------+----------+--------+
|  1|John Doe|2020-11-30|    null|
|  2|John Doe|2020-11-27|      -1|
|  3|John Doe|2020-11-29|       1|
+---+--------+----------+--------+