spark中的pyspark daterange计算

pyspark daterange calculations in spark

我正在尝试处理每个用户的网站登录会话数据。我正在将 S3 会话日志文件读入 RDD。数据看起来像这样。

----------------------------------------
User | Site   | Session start   | Session end
---------------------------------------
Joe  |Waterloo| 9/21/19 3:04 AM |9/21/19 3:18 AM

Stacy|Kirkwood| 8/4/19 3:06 PM  |8/4/19 3:54 PM

John |Waterloo| 9/21/19 8:48 AM |9/21/19 9:05 AM

Stacy|Kirkwood| 8/4/19 4:16 PM  |8/4/19 5:41 PM
...
...

我想知道在给定的一天每小时有多少用户登录。

示例:我可能仅为 9/21/19 处理此数据。因此,我需要删除所有其他记录,然后对 2019 年 9 月 21 日的所有 24 小时内每小时的每一秒的用户会话求和。对于 2019 年 9 月 21 日的所有时间,输出应该可能是 24 行,然后计算一天中的每一秒(哎呀,逐秒数据!)。

在 pyspark 中使用 rdds 或 DF 可以做到这一点吗? (为构建网格的延迟道歉)。 谢谢

尝试检查一下:

初始化过滤器。

val filter = to_date("2019-09-21")
val startFilter = to_timestamp("2019-09-21 00:00:00.000")
val endFilter = to_timestamp("2019-09-21 23:59:59.999")

生成范围 (0 .. 23)。

hours = spark.range(24).collect()

获取与过滤器匹配的实际用户会话。

df = sessions.alias("s") \
    .where(filter >= to_date(s.start) & filter <= to_date(s.end)) \
    .select(s.user, \
            when(s.start < startFilter, startFilter).otherwise(s.start).alias("start"), \
            when(s.end > endFilter, endFilter).otherwise(s.end).alias("end"))

将匹配的用户会话与时间范围相结合。

df2 = df.join(hours, hours.id.between(hour(df.start), hour(df.end)), 'inner') \
    .select(df.user, hours.id.alias("hour"), \
        (when(hour(df.end) > hours.id, 360).otherwise(minute(df.end) * 60 + second(df.end)) - \
         when(hour(df.start) < hours.id, 0).otherwise(minute(df.start) * 60 + second(df.start))).alias("seconds"))

生成摘要:计算每个小时会话的用户数和总秒数。

df2.groupBy(df2.hour)\
    .agg(count(df2.user).alias("user counts"), \
         sum(dg2.seconds).alias("seconds")) \
    .show()

希望对您有所帮助。

我的数据集

data=[['Joe','Waterloo','9/21/19 3:04 AM','9/21/19 3:18 AM'],['Stacy','Kirkwood','8/4/19 3:06 PM','8/4/19 3:54 PM'],['John','Waterloo','9/21/19 8:48 AM','9/21/19 9:05 AM'],
          ['Stacy','Kirkwood','9/21/19 4:06 PM', '9/21/19 4:54 PM'],
         ['Mo','Hashmi','9/21/19 1:06 PM', '9/21/19 5:54 PM'],
         ['Murti','Hash','9/21/19 1:00 PM', '9/21/19 3:00 PM'],
         ['Floo','Shmi','9/21/19 9:10 PM', '9/21/19 11:54 PM']]
    cSchema = StructType([StructField("User", StringType())\
                          ,StructField("Site", StringType())
                          , StructField("Sesh-Start", StringType())
                          , StructField("Sesh-End", StringType())])
    df= spark.createDataFrame(data,schema=cSchema)
    display(df)

解析时间戳

df1=df.withColumn("Start", F.from_unixtime(F.unix_timestamp("Sesh-Start",'MM/dd/yyyy hh:mm aa'),'20yy-MM-dd HH:mm:ss').cast("timestamp")).withColumn("End", F.from_unixtime(F.unix_timestamp("Sesh-End",'MM/dd/yyyy hh:mm aa'),'20yy-MM-dd HH:mm:ss').cast("timestamp")).drop("Sesh-Start","Sesh-End")

构建并注册 udf,每人多小时

def yo(a,b):

  from datetime import datetime
  d1 = datetime.strptime(str(a), '%Y-%m-%d %H:%M:%S')
  d2 = datetime.strptime(str(b), '%Y-%m-%d %H:%M:%S')
  y=[]
  if d1.hour == d2.hour:
     y.append(d1.hour)
  else:
     for i in range(d1.hour,d2.hour+1):
        y.append(i)

  return y

rng= udf(yo, ArrayType(IntegerType()))

将小时列表分解为列

df2=df1.withColumn("new", rng(F.col("Start"),F.col("End"))).withColumn("new1",F.explode("new")).drop("new")

每小时获取秒数

df3=df2.withColumn("Seconds", when(F.hour("Start")==F.hour("End"), F.col("End").cast('long') - F.col("Start").cast('long'))
               .when(F.hour("Start")==F.col("new1"), 3600-F.minute("Start")*60)
               .when(F.hour("End")==F.col("new1"), F.minute("End")*60)
               .otherwise(3600))

创建临时视图并查询它

df3.createOrReplaceTempView("final")
display(spark.sql("Select new1, sum(Seconds) from final group by new1 order by new1"))

Lennart 的上述回答可能更高效,因为他使用连接来获取所有不同的时间,而我使用的 UDF 可能更慢。我的代码适用于可以在线任意时间的任何用户。我的数据仅使用了所需的日期,因此您可以使用上面给出的日期过滤器将查询限制在相关日期。Final output