spark中的pyspark daterange计算
pyspark daterange calculations in spark
我正在尝试处理每个用户的网站登录会话数据。我正在将 S3 会话日志文件读入 RDD。数据看起来像这样。
----------------------------------------
User | Site | Session start | Session end
---------------------------------------
Joe |Waterloo| 9/21/19 3:04 AM |9/21/19 3:18 AM
Stacy|Kirkwood| 8/4/19 3:06 PM |8/4/19 3:54 PM
John |Waterloo| 9/21/19 8:48 AM |9/21/19 9:05 AM
Stacy|Kirkwood| 8/4/19 4:16 PM |8/4/19 5:41 PM
...
...
我想知道在给定的一天每小时有多少用户登录。
示例:我可能仅为 9/21/19
处理此数据。因此,我需要删除所有其他记录,然后对 2019 年 9 月 21 日的所有 24 小时内每小时的每一秒的用户会话求和。对于 2019 年 9 月 21 日的所有时间,输出应该可能是 24 行,然后计算一天中的每一秒(哎呀,逐秒数据!)。
在 pyspark 中使用 rdds 或 DF 可以做到这一点吗?
(为构建网格的延迟道歉)。
谢谢
尝试检查一下:
初始化过滤器。
val filter = to_date("2019-09-21")
val startFilter = to_timestamp("2019-09-21 00:00:00.000")
val endFilter = to_timestamp("2019-09-21 23:59:59.999")
生成范围 (0 .. 23)。
hours = spark.range(24).collect()
获取与过滤器匹配的实际用户会话。
df = sessions.alias("s") \
.where(filter >= to_date(s.start) & filter <= to_date(s.end)) \
.select(s.user, \
when(s.start < startFilter, startFilter).otherwise(s.start).alias("start"), \
when(s.end > endFilter, endFilter).otherwise(s.end).alias("end"))
将匹配的用户会话与时间范围相结合。
df2 = df.join(hours, hours.id.between(hour(df.start), hour(df.end)), 'inner') \
.select(df.user, hours.id.alias("hour"), \
(when(hour(df.end) > hours.id, 360).otherwise(minute(df.end) * 60 + second(df.end)) - \
when(hour(df.start) < hours.id, 0).otherwise(minute(df.start) * 60 + second(df.start))).alias("seconds"))
生成摘要:计算每个小时会话的用户数和总秒数。
df2.groupBy(df2.hour)\
.agg(count(df2.user).alias("user counts"), \
sum(dg2.seconds).alias("seconds")) \
.show()
希望对您有所帮助。
我的数据集
data=[['Joe','Waterloo','9/21/19 3:04 AM','9/21/19 3:18 AM'],['Stacy','Kirkwood','8/4/19 3:06 PM','8/4/19 3:54 PM'],['John','Waterloo','9/21/19 8:48 AM','9/21/19 9:05 AM'],
['Stacy','Kirkwood','9/21/19 4:06 PM', '9/21/19 4:54 PM'],
['Mo','Hashmi','9/21/19 1:06 PM', '9/21/19 5:54 PM'],
['Murti','Hash','9/21/19 1:00 PM', '9/21/19 3:00 PM'],
['Floo','Shmi','9/21/19 9:10 PM', '9/21/19 11:54 PM']]
cSchema = StructType([StructField("User", StringType())\
,StructField("Site", StringType())
, StructField("Sesh-Start", StringType())
, StructField("Sesh-End", StringType())])
df= spark.createDataFrame(data,schema=cSchema)
display(df)
解析时间戳
df1=df.withColumn("Start", F.from_unixtime(F.unix_timestamp("Sesh-Start",'MM/dd/yyyy hh:mm aa'),'20yy-MM-dd HH:mm:ss').cast("timestamp")).withColumn("End", F.from_unixtime(F.unix_timestamp("Sesh-End",'MM/dd/yyyy hh:mm aa'),'20yy-MM-dd HH:mm:ss').cast("timestamp")).drop("Sesh-Start","Sesh-End")
构建并注册 udf,每人多小时
def yo(a,b):
from datetime import datetime
d1 = datetime.strptime(str(a), '%Y-%m-%d %H:%M:%S')
d2 = datetime.strptime(str(b), '%Y-%m-%d %H:%M:%S')
y=[]
if d1.hour == d2.hour:
y.append(d1.hour)
else:
for i in range(d1.hour,d2.hour+1):
y.append(i)
return y
rng= udf(yo, ArrayType(IntegerType()))
将小时列表分解为列
df2=df1.withColumn("new", rng(F.col("Start"),F.col("End"))).withColumn("new1",F.explode("new")).drop("new")
每小时获取秒数
df3=df2.withColumn("Seconds", when(F.hour("Start")==F.hour("End"), F.col("End").cast('long') - F.col("Start").cast('long'))
.when(F.hour("Start")==F.col("new1"), 3600-F.minute("Start")*60)
.when(F.hour("End")==F.col("new1"), F.minute("End")*60)
.otherwise(3600))
创建临时视图并查询它
df3.createOrReplaceTempView("final")
display(spark.sql("Select new1, sum(Seconds) from final group by new1 order by new1"))
Lennart 的上述回答可能更高效,因为他使用连接来获取所有不同的时间,而我使用的 UDF 可能更慢。我的代码适用于可以在线任意时间的任何用户。我的数据仅使用了所需的日期,因此您可以使用上面给出的日期过滤器将查询限制在相关日期。Final output
我正在尝试处理每个用户的网站登录会话数据。我正在将 S3 会话日志文件读入 RDD。数据看起来像这样。
----------------------------------------
User | Site | Session start | Session end
---------------------------------------
Joe |Waterloo| 9/21/19 3:04 AM |9/21/19 3:18 AM
Stacy|Kirkwood| 8/4/19 3:06 PM |8/4/19 3:54 PM
John |Waterloo| 9/21/19 8:48 AM |9/21/19 9:05 AM
Stacy|Kirkwood| 8/4/19 4:16 PM |8/4/19 5:41 PM
...
...
我想知道在给定的一天每小时有多少用户登录。
示例:我可能仅为 9/21/19
处理此数据。因此,我需要删除所有其他记录,然后对 2019 年 9 月 21 日的所有 24 小时内每小时的每一秒的用户会话求和。对于 2019 年 9 月 21 日的所有时间,输出应该可能是 24 行,然后计算一天中的每一秒(哎呀,逐秒数据!)。
在 pyspark 中使用 rdds 或 DF 可以做到这一点吗? (为构建网格的延迟道歉)。 谢谢
尝试检查一下:
初始化过滤器。
val filter = to_date("2019-09-21")
val startFilter = to_timestamp("2019-09-21 00:00:00.000")
val endFilter = to_timestamp("2019-09-21 23:59:59.999")
生成范围 (0 .. 23)。
hours = spark.range(24).collect()
获取与过滤器匹配的实际用户会话。
df = sessions.alias("s") \
.where(filter >= to_date(s.start) & filter <= to_date(s.end)) \
.select(s.user, \
when(s.start < startFilter, startFilter).otherwise(s.start).alias("start"), \
when(s.end > endFilter, endFilter).otherwise(s.end).alias("end"))
将匹配的用户会话与时间范围相结合。
df2 = df.join(hours, hours.id.between(hour(df.start), hour(df.end)), 'inner') \
.select(df.user, hours.id.alias("hour"), \
(when(hour(df.end) > hours.id, 360).otherwise(minute(df.end) * 60 + second(df.end)) - \
when(hour(df.start) < hours.id, 0).otherwise(minute(df.start) * 60 + second(df.start))).alias("seconds"))
生成摘要:计算每个小时会话的用户数和总秒数。
df2.groupBy(df2.hour)\
.agg(count(df2.user).alias("user counts"), \
sum(dg2.seconds).alias("seconds")) \
.show()
希望对您有所帮助。
我的数据集
data=[['Joe','Waterloo','9/21/19 3:04 AM','9/21/19 3:18 AM'],['Stacy','Kirkwood','8/4/19 3:06 PM','8/4/19 3:54 PM'],['John','Waterloo','9/21/19 8:48 AM','9/21/19 9:05 AM'],
['Stacy','Kirkwood','9/21/19 4:06 PM', '9/21/19 4:54 PM'],
['Mo','Hashmi','9/21/19 1:06 PM', '9/21/19 5:54 PM'],
['Murti','Hash','9/21/19 1:00 PM', '9/21/19 3:00 PM'],
['Floo','Shmi','9/21/19 9:10 PM', '9/21/19 11:54 PM']]
cSchema = StructType([StructField("User", StringType())\
,StructField("Site", StringType())
, StructField("Sesh-Start", StringType())
, StructField("Sesh-End", StringType())])
df= spark.createDataFrame(data,schema=cSchema)
display(df)
解析时间戳
df1=df.withColumn("Start", F.from_unixtime(F.unix_timestamp("Sesh-Start",'MM/dd/yyyy hh:mm aa'),'20yy-MM-dd HH:mm:ss').cast("timestamp")).withColumn("End", F.from_unixtime(F.unix_timestamp("Sesh-End",'MM/dd/yyyy hh:mm aa'),'20yy-MM-dd HH:mm:ss').cast("timestamp")).drop("Sesh-Start","Sesh-End")
构建并注册 udf,每人多小时
def yo(a,b):
from datetime import datetime
d1 = datetime.strptime(str(a), '%Y-%m-%d %H:%M:%S')
d2 = datetime.strptime(str(b), '%Y-%m-%d %H:%M:%S')
y=[]
if d1.hour == d2.hour:
y.append(d1.hour)
else:
for i in range(d1.hour,d2.hour+1):
y.append(i)
return y
rng= udf(yo, ArrayType(IntegerType()))
将小时列表分解为列
df2=df1.withColumn("new", rng(F.col("Start"),F.col("End"))).withColumn("new1",F.explode("new")).drop("new")
每小时获取秒数
df3=df2.withColumn("Seconds", when(F.hour("Start")==F.hour("End"), F.col("End").cast('long') - F.col("Start").cast('long'))
.when(F.hour("Start")==F.col("new1"), 3600-F.minute("Start")*60)
.when(F.hour("End")==F.col("new1"), F.minute("End")*60)
.otherwise(3600))
创建临时视图并查询它
df3.createOrReplaceTempView("final")
display(spark.sql("Select new1, sum(Seconds) from final group by new1 order by new1"))