Synapse 工作区中的 PySpark Windows 函数(领先,滞后)

PySpark Windows function (lead,lag) in Synapse Workspace

场景:

Pyspark 数据集如下所示

#base Schema for Testing purpose
#Dataset

from pyspark.sql.types import StructType,StructField, StringType, IntegerType

#Create User defined Custom Schema using StructType
schema = StructType([ StructField('CaseNumber', StringType(), True)\
                       ,StructField('StartTime', StringType(), True)\
                       ,StructField('EndTime', StringType(), True)])

data = [
        {"CaseNumber": 'Ticket1', "StartTime": '1/22/19 10:00', "EndTime": ''},
        {"CaseNumber": 'Ticket1', "StartTime": '', "EndTime": '1/23/19 11:00'},
        {"CaseNumber": 'Ticket1', "StartTime": '1/25/19 7:00', "EndTime": ''},
        {"CaseNumber": 'Ticket1', "StartTime": '1/27/19 3:00', "EndTime": ''},
        {"CaseNumber": 'Ticket2', "StartTime": '1/29/19 10:00', "EndTime": ''},
        {"CaseNumber": 'Ticket2', "StartTime": '', "EndTime": '2/23/19 2:00'},
        {"CaseNumber": 'Ticket2', "StartTime": '3/25/19 7:00', "EndTime": ''},
        {"CaseNumber": 'Ticket2', "StartTime": '', "EndTime": '3/27/19 8:00'},
        {"CaseNumber": 'Ticket2', "StartTime": '', "EndTime": '3/27/19 10:00'},
        {"CaseNumber": 'Ticket3', "StartTime": '4/25/19 1:00', "EndTime": ''}
        ]

from pyspark.sql import SparkSession
#Create PySpark SparkSession
spark = SparkSession.builder \
    .master('local[1]') \
    .appName('SparkByExamples.com') \
    .getOrCreate()

# Creation of a dummy dataframe:
df1 = spark.createDataFrame(data,schema=schema)

df1.show()

创建的数据集:

+----------+-------------+-------------+
|CaseNumber|    StartTime|      EndTime|
+----------+-------------+-------------+
|   Ticket1|1/22/19 10:00|          NaN|
|   Ticket1|          NaN|1/23/19 11:00|
|   Ticket1| 1/25/19 7:00|          NaN|
|   Ticket1| 1/27/19 3:00|          NaN|
|   Ticket2|1/29/19 10:00|          NaN|
|   Ticket2|          NaN| 2/23/19 2:00|
|   Ticket2| 3/25/19 7:00|          NaN|
|   Ticket2|          NaN| 3/27/19 8:00|
|   Ticket2|          NaN|3/27/19 10:00|
|   Ticket3| 4/25/19 1:00|          NaN|
+----------+-------------+-------------+

期望的输出应该是:

+----------+-------------+-------------+
|CaseNumber|    StartTime|      EndTime|
+----------+-------------+-------------+
|   Ticket1|1/22/19 10:00|1/23/19 11:00|
|   Ticket2|1/29/19 10:00| 2/23/19 2:00|
|   Ticket2| 3/25/19 7:00| 3/27/19 8:00|
+----------+-------------+-------------+

应用Lead函数查看工单是否存在endtime

from pyspark.sql.window import Window
import pyspark.sql.functions as psf

windowSpec = Window.partitionBy("CaseNumber").orderBy("CaseNumber")
df = df1.withColumn("lead",lead("EndTime",1).over(windowSpec))
df.show()

pysparkdf = df.toPandas()

import pandas as pd 
tickets = pysparkdf.groupby('CaseNumber')

def isLeadnull(e): 
    return e['lead'] != None

my_list = []
for i,ticket in tickets:
    for j,e in ticket.iterrows() :
        if  isLeadnull(e):
            my_list.append({'CaseNumber': e['CaseNumber'] ,'Start': e['StartTime'], 'EndTime': e['lead']})
        else:
            print(e['lead'],'Do nothing as condition not met')

这个函数后的输出是:

[{'CaseNumber': 'Ticket1',
  'Start': '1/22/19 10:00',
  'EndTime': '1/23/19 11:00'},
 {'CaseNumber': 'Ticket1', 'Start': 'NaN', 'EndTime': 'NaN'},
 {'CaseNumber': 'Ticket1', 'Start': '1/25/19 7:00', 'EndTime': 'NaN'},
 {'CaseNumber': 'Ticket2',
  'Start': '1/29/19 10:00',
  'EndTime': '2/23/19 2:00'},
 {'CaseNumber': 'Ticket2', 'Start': 'NaN', 'EndTime': 'NaN'},
 {'CaseNumber': 'Ticket2', 'Start': '3/25/19 7:00', 'EndTime': '3/27/19 8:00'},
 {'CaseNumber': 'Ticket2', 'Start': 'NaN', 'EndTime': '3/27/19 10:00'}]

这是一种 Gaps and Islands 问题。您可以通过创建 group 列使用条件累积和来识别“island”,然后您可以按 CaseNumber + group 分组并聚合最大值每组 StartTime 和最小值 EndTime

from pyspark.sql import functions as F, Window

# first, convert strings to timestamps and replacing empty strings with nulls
df1 = df1.withColumn("StartTime", F.to_timestamp("StartTime", "M/dd/yy H:mm")) \
    .withColumn("EndTime", F.to_timestamp("EndTime", "M/dd/yy H:mm")) \
    .replace("", None)

w = Window.partitionBy("CaseNumber").orderBy(F.coalesce("StartTime", "EndTime"))

df2 = df1.withColumn("group", F.sum(F.when(F.col("StartTime").isNotNull(), 1)).over(w)) \
    .groupBy("CaseNumber", "group") \
    .agg(F.max("StartTime").alias("StartTime"), F.min("EndTime").alias("EndTime")) \
    .filter(F.col("EndTime").isNotNull()) \
    .drop("group")

df2.show()
#+----------+-------------------+-------------------+
#|CaseNumber|          StartTime|            EndTime|
#+----------+-------------------+-------------------+
#|   Ticket1|2019-01-22 10:00:00|2019-01-23 11:00:00|
#|   Ticket2|2019-01-29 10:00:00|2019-02-23 02:00:00|
#|   Ticket2|2019-03-25 07:00:00|2019-03-27 08:00:00|
#+----------+-------------------+-------------------+

为了理解其中的逻辑,您可以逐步显示组之前的中间列:

df1.withColumn("group", F.sum(F.when(F.col("StartTime").isNotNull(), 1)).over(w)).show()

#+----------+-------------------+-------------------+-----+
#|CaseNumber|          StartTime|            EndTime|group|
#+----------+-------------------+-------------------+-----+
#|   Ticket1|2019-01-22 10:00:00|               null|    1|
#|   Ticket1|               null|2019-01-23 11:00:00|    1|
#|   Ticket1|2019-01-25 07:00:00|               null|    2|
#|   Ticket1|2019-01-27 03:00:00|               null|    3|
#|   Ticket2|2019-01-29 10:00:00|               null|    1|
#|   Ticket2|               null|2019-02-23 02:00:00|    1|
#|   Ticket2|2019-03-25 07:00:00|               null|    2|
#|   Ticket2|               null|2019-03-27 08:00:00|    2|
#|   Ticket2|               null|2019-03-27 10:00:00|    2|
#|   Ticket3|2019-04-25 01:00:00|               null|    1|
#+----------+-------------------+-------------------+-----+