GroupBy 在 PySpark DataFrame 问题上使用时间频率
GroupBy using Time Frequency on PySpark DataFrame Issue
我是 PySpark 的新手。
我正在尝试执行 GroupBy 操作来获取聚合计数。但我无法根据时间频率执行 groupBy。我需要使用字段“CAPTUREDTIME、NODE、CHANNEL、LOCATION、TACK”执行“groupBy”。但是在这个 groupBy 中,我应该使用“CAPTUREDTIME”字段基于“每小时”、“每天”、“每周”、“每月”进行分组。
请查找以下样本数据。
-----------------+------+------+--------+----------+--------------
|CAPTUREDTIME| NODE| CHANNEL | LOCATION| TACK
+-----------------+------+------+--------+----------+-------------
|20-05-09 03:06:21| PUSC_RES| SIMPLEX| NORTH_AL| UE220034
|20-05-09 04:33:04| PUSC_RES| SIMPLEX| SOUTH_AL| UE220034
|20-05-09 12:04:52| TESC_RES| SIMPLEX| NORTH_AL| UE220057
|20-05-10 04:24:09| TESC_RES| SIMPLEX| NORTH_AL| UE220057
|20-05-10 04:33:04| PUSC_RES| SIMPLEX| SOUTH_AL| UE220034
|20-04-09 10:57:48| TESC_RES| SIMPLEX| NORTH_AL| UE220057
|20-04-09 12:12:26| TESC_RES| SIMPLEX| NORTH_AL| UE220057
|20-04-09 03:26:33| PUSC_RES| SIMPLEX| NORTH_AL| UE220071
+-----------------+------+------+--------+----------+-------------
我使用了下面的pyspark代码
df = df.groupby("CAPTUREDTIME", "NODE", "CHANNEL", "LOCATION", "TACK").agg(
func.count("TACK").alias("count")
)
如何扩展上述代码以在 'hourly'、'daily'、'weekly'、'monthly' 上分组?
我需要以下格式的输出(共享示例输出):
每小时:
|拍摄时间|节点|频道 |地点|大头钉|计数
|20-05-0903:00:00| PUSC_RES|单纯形| NORTH_AL| UE220034| 2
|20-05-0904:00:00| PUSC_RES|单纯形| SOUTH_AL| UE220034| 2
每日:
|拍摄时间|节点|频道 |地点|大头钉|计数
|20-05-0900:00:00| PUSC_RES|单纯形| NORTH_AL| UE220034| 1
|20-05-0900:00:00| PUSC_RES|单纯形| SOUTH_AL| UE220034| 2
|20-05-0900:00:00| TESC_RES|单纯形| NORTH_AL| UE220057| 3
每周:
|拍摄时间|节点|频道 |地点|大头钉|计数
|20-05-0900:00:00| PUSC_RES|单纯形| NORTH_AL| UE220034| 1
每月:
|拍摄时间|节点|频道 |地点|大头钉|计数
|20-05-0900:00:00| PUSC_RES|单纯形| NORTH_AL| UE220034| 1
Spark 为日期操作提供了一个相对丰富的库。您的问题的答案是提取日期部分和显示日期格式的组合。
我重新创建了你的数据如下:
val capturesRaw = spark.read
.option("ignoreLeadingWhiteSpace", "true")
.option("ignoreTrailingWhiteSpace", "true")
.option("delimiter", "|")
.option("header", "true")
.csv(spark.sparkContext.parallelize("""
CAPTUREDTIME| NODE| CHANNEL | LOCATION| TACK
20-05-09 03:06:21| PUSC_RES| SIMPLEX| NORTH_AL| UE220034
20-05-09 04:33:04| PUSC_RES| SIMPLEX| SOUTH_AL| UE220034
20-05-09 12:04:52| TESC_RES| SIMPLEX| NORTH_AL| UE220057
20-05-10 04:24:09| TESC_RES| SIMPLEX| NORTH_AL| UE220057
20-05-10 04:33:04| PUSC_RES| SIMPLEX| SOUTH_AL| UE220034
20-04-09 10:57:48| TESC_RES| SIMPLEX| NORTH_AL| UE220057
20-04-09 12:12:26| TESC_RES| SIMPLEX| NORTH_AL| UE220057
20-04-09 03:26:33| PUSC_RES| SIMPLEX| NORTH_AL| UE220071"""
.split("\n")).toDS)
注意:我用的是Scala,但是代码差别很小希望你能看懂。我相信开头的 val
实际上是唯一的区别。
我假设前两位数字代表两位数的年份?要继续,我们需要确保 capturedtime 是一个时间戳。我更喜欢使用 SQL 来操作数据帧,因为我发现它更具可读性。
spark.sql("""select to_timestamp('20' || capturedtime) capturedtime, NODE, CHANNEL,
LOCATION, TACK from captures_raw""")
.createOrReplaceTempView("captures_raw")
如果您愿意,可以直接在数据帧上完成同样的事情
capturesRaw.withColumn("capturedtimestamp",
to_timestamp(col("capturedtime"), "yy-MM-dd hh:mm:ss"))
此时,我们可以创建您请求的字段:
spark.sql("""select capturedtime,
month(capturedtime) cap_month,
weekofyear(capturedtime) cap_week,
day(capturedtime) cap_day,
hour(capturedtime) cap_hr, NODE, CHANNEL, LOCATION, TACK
from captures_raw""").createOrReplaceTempView("captures")
创建字段后,我们就可以回答您的问题了。例如,要单独按月汇总(不包括其余时间戳),请按以下步骤操作:
spark.sql("""select date_format(capturedtime, "yyyy-MM") year_month, cap_month,
cap_week, cap_day, cap_hr, count(*) count
from captures
group by 1,2,3,4,5""").show
哪个returns
+----------+---------+--------+-------+------+-----+
|year_month|cap_month|cap_week|cap_day|cap_hr|count|
+----------+---------+--------+-------+------+-----+
| 2020-04| 4| 15| 9| 3| 1|
| 2020-04| 4| 15| 9| 10| 1|
| 2020-05| 5| 19| 9| 4| 1|
| 2020-05| 5| 19| 9| 12| 1|
| 2020-04| 4| 15| 9| 12| 1|
| 2020-05| 5| 19| 9| 3| 1|
| 2020-05| 5| 19| 10| 4| 2|
+----------+---------+--------+-------+------+-----+
每天的总结可以这样产生:
spark.sql("""select date_format(capturedtime, "yyyy-MM-dd") captured_date,
cap_day, cap_hr, count(*) count
from captures
group by 1,2,3""").show
+-------------+-------+------+-----+
|captured_date|cap_day|cap_hr|count|
+-------------+-------+------+-----+
| 2020-05-10| 10| 4| 2|
| 2020-04-09| 9| 12| 1|
| 2020-05-09| 9| 4| 1|
| 2020-05-09| 9| 12| 1|
| 2020-04-09| 9| 3| 1|
| 2020-04-09| 9| 10| 1|
| 2020-05-09| 9| 3| 1|
+-------------+-------+------+-----+
您有两种方法来回答您的问题,要么将时间戳转换为您想要分组的日期粒度,要么(如您在评论中所说)使用 sql window 函数按您喜欢的时间间隔分组。
只知道不能通过 Spark 中的 window SQL 函数进行每月汇总。
在这里你可以看到代码,前三个示例使用 window SQL 函数,最后一个示例按月转换时间戳,然后按每一列分组。
df = spark.createDataFrame(
[
("20-05-09 03:06:21", "PUSC_RES", "SIMPLEX", "NORTH_AL", "UE220034"),
("20-05-09 04:33:04", "PUSC_RES", "SIMPLEX", "SOUTH_AL", "UE220034"),
("20-05-09 12:04:52", "TESC_RES", "SIMPLEX", "NORTH_AL", "UE220057"),
("20-05-10 04:24:09", "TESC_RES", "SIMPLEX", "NORTH_AL", "UE220057"),
("20-05-10 04:33:04", "PUSC_RES", "SIMPLEX", "SOUTH_AL", "UE220034"),
("20-04-09 10:57:48", "TESC_RES", "SIMPLEX", "NORTH_AL", "UE220057"),
("20-04-09 12:12:26", "TESC_RES", "SIMPLEX", "NORTH_AL", "UE220057"),
("20-04-09 03:26:33", "PUSC_RES", "SIMPLEX", "NORTH_AL", "UE220071")
],
['CAPTUREDTIME', 'NODE', 'CHANNEL', 'LOCATION', 'TACK']
)
from pyspark.sql.functions import col, count, date_format, date_sub, date_trunc, month, next_day, to_timestamp, weekofyear, window, year
每小时
这个我还是保持window的逻辑,这样大家可以参考一下Spark中的各种可能性。在显示数据帧之前,我只 select 最后 window 的开头。
hourly = (
df
.withColumn("captured_time", to_timestamp(col('CAPTUREDTIME'), 'yy-MM-dd HH:mm:ss'))
.groupBy(window(col("captured_time"), "1 hour").alias("captured_time"), "NODE", "CHANNEL", "LOCATION", "TACK")
.agg(count("*"))
.withColumn("captured_time_hour", col("captured_time.start"))
.drop("captured_time")
)
hourly.sort("captured_time_hour").show(100, False)
每天
通过date_trunc
函数,我可以只考虑日期来截断时间戳
daily = (
df
.withColumn("captured_time", to_timestamp(col('CAPTUREDTIME'), 'yy-MM-dd HH:mm:ss'))
.withColumn("captured_time_day", date_trunc("day", col("captured_time")))
.groupBy("captured_time_day", "NODE", "CHANNEL", "LOCATION", "TACK")
.agg(count("*"))
)
daily.sort("captured_time_day").show(100, False)
每周
这个有点棘手。首先,我使用星期一的 next_day
函数。如果您将周日视为一周的开始,请根据它更新此代码,但我将周一视为一周的开始(这取决于 SQL 我相信的方言和地区)
然后我们还可以添加一个weekofyear
函数来根据需要检索周数
weekly = (
df
.withColumn("captured_time", to_timestamp(col('CAPTUREDTIME'), 'yy-MM-dd HH:mm:ss'))
.withColumn("start_day", date_sub(next_day(col("captured_time"), "monday"), 7))
.groupBy("start_day", "NODE", "CHANNEL", "LOCATION", "TACK")
.agg(count("*"))
.withColumn("start_day", to_timestamp(col("start_day")))
.withColumn("week_of_year", weekofyear(col("start_day")))
)
weekly.sort("start_day").show(100, False)
每月
我们只是将时间戳格式化为日期,然后将其转换回时间戳。这样做只是为了展示另一种方法。我们可以将时间戳截断为日常用例。我还展示了两种提取月份名称和缩写的方法。请注意您的 Spark 版本,因为它已在 Spark 3.0.0
中进行测试
monthly = (
df
.withColumn("captured_time", to_timestamp(col('CAPTUREDTIME'), 'yy-MM-dd HH:mm:ss'))
.withColumn("captured_time_month", date_format(col('captured_time'), '1/M/yyyy'))
.groupBy(col("captured_time_month"), "NODE", "CHANNEL", "LOCATION", "TACK")
.agg(count("*").alias("Count TACK"))
.withColumn("captured_time_month", to_timestamp(col("captured_time_month"), '1/M/yyyy'))
.withColumn("month", month(col("captured_time_month")))
.withColumn("month_abbr", date_format(col("captured_time_month"),'MMM'))
.withColumn("full_month_name", date_format(col("captured_time_month"),'MMMM'))
)
monthly.sort("captured_time_month").show(100, False)
再见!
我是 PySpark 的新手。
我正在尝试执行 GroupBy 操作来获取聚合计数。但我无法根据时间频率执行 groupBy。我需要使用字段“CAPTUREDTIME、NODE、CHANNEL、LOCATION、TACK”执行“groupBy”。但是在这个 groupBy 中,我应该使用“CAPTUREDTIME”字段基于“每小时”、“每天”、“每周”、“每月”进行分组。
请查找以下样本数据。
-----------------+------+------+--------+----------+--------------
|CAPTUREDTIME| NODE| CHANNEL | LOCATION| TACK
+-----------------+------+------+--------+----------+-------------
|20-05-09 03:06:21| PUSC_RES| SIMPLEX| NORTH_AL| UE220034
|20-05-09 04:33:04| PUSC_RES| SIMPLEX| SOUTH_AL| UE220034
|20-05-09 12:04:52| TESC_RES| SIMPLEX| NORTH_AL| UE220057
|20-05-10 04:24:09| TESC_RES| SIMPLEX| NORTH_AL| UE220057
|20-05-10 04:33:04| PUSC_RES| SIMPLEX| SOUTH_AL| UE220034
|20-04-09 10:57:48| TESC_RES| SIMPLEX| NORTH_AL| UE220057
|20-04-09 12:12:26| TESC_RES| SIMPLEX| NORTH_AL| UE220057
|20-04-09 03:26:33| PUSC_RES| SIMPLEX| NORTH_AL| UE220071
+-----------------+------+------+--------+----------+-------------
我使用了下面的pyspark代码
df = df.groupby("CAPTUREDTIME", "NODE", "CHANNEL", "LOCATION", "TACK").agg(
func.count("TACK").alias("count")
)
如何扩展上述代码以在 'hourly'、'daily'、'weekly'、'monthly' 上分组?
我需要以下格式的输出(共享示例输出):
每小时:
|拍摄时间|节点|频道 |地点|大头钉|计数
|20-05-0903:00:00| PUSC_RES|单纯形| NORTH_AL| UE220034| 2
|20-05-0904:00:00| PUSC_RES|单纯形| SOUTH_AL| UE220034| 2
每日:
|拍摄时间|节点|频道 |地点|大头钉|计数
|20-05-0900:00:00| PUSC_RES|单纯形| NORTH_AL| UE220034| 1
|20-05-0900:00:00| PUSC_RES|单纯形| SOUTH_AL| UE220034| 2
|20-05-0900:00:00| TESC_RES|单纯形| NORTH_AL| UE220057| 3
每周:
|拍摄时间|节点|频道 |地点|大头钉|计数
|20-05-0900:00:00| PUSC_RES|单纯形| NORTH_AL| UE220034| 1
每月:
|拍摄时间|节点|频道 |地点|大头钉|计数
|20-05-0900:00:00| PUSC_RES|单纯形| NORTH_AL| UE220034| 1
Spark 为日期操作提供了一个相对丰富的库。您的问题的答案是提取日期部分和显示日期格式的组合。
我重新创建了你的数据如下:
val capturesRaw = spark.read
.option("ignoreLeadingWhiteSpace", "true")
.option("ignoreTrailingWhiteSpace", "true")
.option("delimiter", "|")
.option("header", "true")
.csv(spark.sparkContext.parallelize("""
CAPTUREDTIME| NODE| CHANNEL | LOCATION| TACK
20-05-09 03:06:21| PUSC_RES| SIMPLEX| NORTH_AL| UE220034
20-05-09 04:33:04| PUSC_RES| SIMPLEX| SOUTH_AL| UE220034
20-05-09 12:04:52| TESC_RES| SIMPLEX| NORTH_AL| UE220057
20-05-10 04:24:09| TESC_RES| SIMPLEX| NORTH_AL| UE220057
20-05-10 04:33:04| PUSC_RES| SIMPLEX| SOUTH_AL| UE220034
20-04-09 10:57:48| TESC_RES| SIMPLEX| NORTH_AL| UE220057
20-04-09 12:12:26| TESC_RES| SIMPLEX| NORTH_AL| UE220057
20-04-09 03:26:33| PUSC_RES| SIMPLEX| NORTH_AL| UE220071"""
.split("\n")).toDS)
注意:我用的是Scala,但是代码差别很小希望你能看懂。我相信开头的 val
实际上是唯一的区别。
我假设前两位数字代表两位数的年份?要继续,我们需要确保 capturedtime 是一个时间戳。我更喜欢使用 SQL 来操作数据帧,因为我发现它更具可读性。
spark.sql("""select to_timestamp('20' || capturedtime) capturedtime, NODE, CHANNEL,
LOCATION, TACK from captures_raw""")
.createOrReplaceTempView("captures_raw")
如果您愿意,可以直接在数据帧上完成同样的事情
capturesRaw.withColumn("capturedtimestamp",
to_timestamp(col("capturedtime"), "yy-MM-dd hh:mm:ss"))
此时,我们可以创建您请求的字段:
spark.sql("""select capturedtime,
month(capturedtime) cap_month,
weekofyear(capturedtime) cap_week,
day(capturedtime) cap_day,
hour(capturedtime) cap_hr, NODE, CHANNEL, LOCATION, TACK
from captures_raw""").createOrReplaceTempView("captures")
创建字段后,我们就可以回答您的问题了。例如,要单独按月汇总(不包括其余时间戳),请按以下步骤操作:
spark.sql("""select date_format(capturedtime, "yyyy-MM") year_month, cap_month,
cap_week, cap_day, cap_hr, count(*) count
from captures
group by 1,2,3,4,5""").show
哪个returns
+----------+---------+--------+-------+------+-----+
|year_month|cap_month|cap_week|cap_day|cap_hr|count|
+----------+---------+--------+-------+------+-----+
| 2020-04| 4| 15| 9| 3| 1|
| 2020-04| 4| 15| 9| 10| 1|
| 2020-05| 5| 19| 9| 4| 1|
| 2020-05| 5| 19| 9| 12| 1|
| 2020-04| 4| 15| 9| 12| 1|
| 2020-05| 5| 19| 9| 3| 1|
| 2020-05| 5| 19| 10| 4| 2|
+----------+---------+--------+-------+------+-----+
每天的总结可以这样产生:
spark.sql("""select date_format(capturedtime, "yyyy-MM-dd") captured_date,
cap_day, cap_hr, count(*) count
from captures
group by 1,2,3""").show
+-------------+-------+------+-----+
|captured_date|cap_day|cap_hr|count|
+-------------+-------+------+-----+
| 2020-05-10| 10| 4| 2|
| 2020-04-09| 9| 12| 1|
| 2020-05-09| 9| 4| 1|
| 2020-05-09| 9| 12| 1|
| 2020-04-09| 9| 3| 1|
| 2020-04-09| 9| 10| 1|
| 2020-05-09| 9| 3| 1|
+-------------+-------+------+-----+
您有两种方法来回答您的问题,要么将时间戳转换为您想要分组的日期粒度,要么(如您在评论中所说)使用 sql window 函数按您喜欢的时间间隔分组。
只知道不能通过 Spark 中的 window SQL 函数进行每月汇总。
在这里你可以看到代码,前三个示例使用 window SQL 函数,最后一个示例按月转换时间戳,然后按每一列分组。
df = spark.createDataFrame(
[
("20-05-09 03:06:21", "PUSC_RES", "SIMPLEX", "NORTH_AL", "UE220034"),
("20-05-09 04:33:04", "PUSC_RES", "SIMPLEX", "SOUTH_AL", "UE220034"),
("20-05-09 12:04:52", "TESC_RES", "SIMPLEX", "NORTH_AL", "UE220057"),
("20-05-10 04:24:09", "TESC_RES", "SIMPLEX", "NORTH_AL", "UE220057"),
("20-05-10 04:33:04", "PUSC_RES", "SIMPLEX", "SOUTH_AL", "UE220034"),
("20-04-09 10:57:48", "TESC_RES", "SIMPLEX", "NORTH_AL", "UE220057"),
("20-04-09 12:12:26", "TESC_RES", "SIMPLEX", "NORTH_AL", "UE220057"),
("20-04-09 03:26:33", "PUSC_RES", "SIMPLEX", "NORTH_AL", "UE220071")
],
['CAPTUREDTIME', 'NODE', 'CHANNEL', 'LOCATION', 'TACK']
)
from pyspark.sql.functions import col, count, date_format, date_sub, date_trunc, month, next_day, to_timestamp, weekofyear, window, year
每小时
这个我还是保持window的逻辑,这样大家可以参考一下Spark中的各种可能性。在显示数据帧之前,我只 select 最后 window 的开头。
hourly = (
df
.withColumn("captured_time", to_timestamp(col('CAPTUREDTIME'), 'yy-MM-dd HH:mm:ss'))
.groupBy(window(col("captured_time"), "1 hour").alias("captured_time"), "NODE", "CHANNEL", "LOCATION", "TACK")
.agg(count("*"))
.withColumn("captured_time_hour", col("captured_time.start"))
.drop("captured_time")
)
hourly.sort("captured_time_hour").show(100, False)
每天
通过date_trunc
函数,我可以只考虑日期来截断时间戳
daily = (
df
.withColumn("captured_time", to_timestamp(col('CAPTUREDTIME'), 'yy-MM-dd HH:mm:ss'))
.withColumn("captured_time_day", date_trunc("day", col("captured_time")))
.groupBy("captured_time_day", "NODE", "CHANNEL", "LOCATION", "TACK")
.agg(count("*"))
)
daily.sort("captured_time_day").show(100, False)
每周
这个有点棘手。首先,我使用星期一的 next_day
函数。如果您将周日视为一周的开始,请根据它更新此代码,但我将周一视为一周的开始(这取决于 SQL 我相信的方言和地区)
然后我们还可以添加一个weekofyear
函数来根据需要检索周数
weekly = (
df
.withColumn("captured_time", to_timestamp(col('CAPTUREDTIME'), 'yy-MM-dd HH:mm:ss'))
.withColumn("start_day", date_sub(next_day(col("captured_time"), "monday"), 7))
.groupBy("start_day", "NODE", "CHANNEL", "LOCATION", "TACK")
.agg(count("*"))
.withColumn("start_day", to_timestamp(col("start_day")))
.withColumn("week_of_year", weekofyear(col("start_day")))
)
weekly.sort("start_day").show(100, False)
每月
我们只是将时间戳格式化为日期,然后将其转换回时间戳。这样做只是为了展示另一种方法。我们可以将时间戳截断为日常用例。我还展示了两种提取月份名称和缩写的方法。请注意您的 Spark 版本,因为它已在 Spark 3.0.0
中进行测试monthly = (
df
.withColumn("captured_time", to_timestamp(col('CAPTUREDTIME'), 'yy-MM-dd HH:mm:ss'))
.withColumn("captured_time_month", date_format(col('captured_time'), '1/M/yyyy'))
.groupBy(col("captured_time_month"), "NODE", "CHANNEL", "LOCATION", "TACK")
.agg(count("*").alias("Count TACK"))
.withColumn("captured_time_month", to_timestamp(col("captured_time_month"), '1/M/yyyy'))
.withColumn("month", month(col("captured_time_month")))
.withColumn("month_abbr", date_format(col("captured_time_month"),'MMM'))
.withColumn("full_month_name", date_format(col("captured_time_month"),'MMMM'))
)
monthly.sort("captured_time_month").show(100, False)
再见!