
What is the best way to extract time resolution from timestamp for specific value in column?

假设我有以下 Spark 框架:

|timestamp                 |name |
|2021-11-06 16:29:00.004204|Alice|
|2021-11-06 16:29:00.004204|Bob  |

现在我想根据特定 name == 'Alice' 的时间戳提取 records/rows 的计数值,如下所示:

和 return 将结果返回到 Spark 框架。我尝试了以下方法但未成功:

import time
import datetime as dt
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.functions import dayofmonth, dayofweek
from pyspark.sql.functions import to_date
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, TimestampType

dict = [{ 'name': 'Alice'},
        { 'name': 'Bob'}]
#df = spark.createDataFrame(dict)
schema = StructType([ 
    StructField("timestamp",        TimestampType(), True), \
    StructField("date",             StringType(), True), \
    StructField("name",            StringType(), True), \
#create a Spark dataframe
sqlCtx = SQLContext(sc)
sdf = sqlCtx.createDataFrame(data=dict,schema=schema)

#Generate data and timestamp
new_df = sdf.withColumn('timestamp',    F.current_timestamp().cast("timestamp")) \
            .withColumn('date',         F.current_date().cast("date")) \
            .withColumn('day_of_month', dayofmonth('timestamp')) \
            .withColumn('day_of_week', ((dayofweek('timestamp')+5)%7)+1)  # start of the week as a Monday = 1 (by default is Sunday = 1)
            #.withColumn("No. records in 1st 12-hrs",from_unixtime(unix_timestamp(col("timestamp"),"yyyy-MM-dd HH:mm:ss"),"HH:mm:ss")) \
            #.filter(col("timestamp").between("00:00","11:59")) \
            #.groupBy("No. records in 1st 12-hrs", "name").sum("Count") \
            #.withColumn("No. records in 1st 12-hrs",from_unixtime(unix_timestamp(col("timestamp"),"yyyy-MM-dd HH:mm:ss"),"HH:mm:ss")) \
            #.filter(col("timestamp").between("12:00","23:59")) \
            #.groupBy("No. records in 1st 12-hrs" , "name").sum("Count") \

            #.withColumn('# No. records in 1st 8-hrs shift (00:00-07:59:59)', ????('timestamp')) \
            #.withColumn('# No. records in 2nd 8-hrs shift (08:00-15:59:59)', ????('timestamp')) \
            #.withColumn('# No. records in 3rd 8-hrs shift (16:00-23:59:59)', ????('timestamp')) \
new_df.show(truncate = False)

到目前为止,我的输出如下所示,您可以在 Colab notebook:

|timestamp                 |date      |name |day_of_month|day_of_week|
|2021-11-06 16:17:43.698815|2021-11-06|Alice|6           |6          |
|2021-11-06 16:17:43.698815|2021-11-06|Bob  |6           |6          |

或者,我检查了一些 post 的 as well as a cool and ,除了工作班次范围外,还应用于特定 name 的主火花框架。

请注意,我对使用 UDF 或通过 toPandas()


所以预期结果应该与特定 name == 'Alice':

|No. records in 1st 12-hrs |No. records in 1st 12-hrs |No. records in 1st 8-hrs  |No. records in 2nd 8-hrs  |No. records in 3rd 8-hrs  |
|                          |                          |                          |                          |                          |

您可以通过检查时间戳的小时部分在 [0, 11][12, 23] 等之间来实现...

import pyspark.sql.functions as F

new_df = sdf.groupBy("name").agg(
    F.sum(F.hour("timestamp").between(0, 11).cast("int")).alias("1st-12-hrs"),
    F.sum(F.hour("timestamp").between(12, 23).cast("int")).alias("2nd-12-hrs"),
    F.sum(F.hour("timestamp").between(0, 7).cast("int")).alias("1st-8-hrs"),
    F.sum(F.hour("timestamp").between(8, 15).cast("int")).alias("2nd-8-hrs"),
    F.sum(F.hour("timestamp").between(16, 23).cast("int")).alias("3rd-8-hrs"),


#|name |1st-12-hrs|2nd-12-hrs|1st-8-hrs|2nd-8-hrs|3rd-8-hrs|
#|Bob  |0         |1         |0        |0        |1        |
#|Alice|0         |1         |0        |0        |1        |