GroupBy 在 PySpark DataFrame 问题上使用时间频率

GroupBy using Time Frequency on PySpark DataFrame Issue

我是 PySpark 的新手。

我正在尝试执行 GroupBy 操作来获取聚合计数。但我无法根据时间频率执行 groupBy。我需要使用字段“CAPTUREDTIME、NODE、CHANNEL、LOCATION、TACK”执行“groupBy”。但是在这个 groupBy 中,我应该使用“CAPTUREDTIME”字段基于“每小时”、“每天”、“每周”、“每月”进行分组。

请查找以下样本数据。

-----------------+------+------+--------+----------+--------------

|CAPTUREDTIME|      NODE|       CHANNEL  |  LOCATION|    TACK

+-----------------+------+------+--------+----------+-------------

|20-05-09 03:06:21|   PUSC_RES|   SIMPLEX|  NORTH_AL|    UE220034

|20-05-09 04:33:04|   PUSC_RES|   SIMPLEX|  SOUTH_AL|    UE220034

|20-05-09 12:04:52|   TESC_RES|   SIMPLEX|  NORTH_AL|    UE220057

|20-05-10 04:24:09|   TESC_RES|   SIMPLEX|  NORTH_AL|    UE220057

|20-05-10 04:33:04|   PUSC_RES|   SIMPLEX|  SOUTH_AL|    UE220034

|20-04-09 10:57:48|   TESC_RES|   SIMPLEX|  NORTH_AL|    UE220057

|20-04-09 12:12:26|   TESC_RES|   SIMPLEX|  NORTH_AL|    UE220057

|20-04-09 03:26:33|   PUSC_RES|   SIMPLEX|  NORTH_AL|    UE220071

+-----------------+------+------+--------+----------+------------- 

我使用了下面的pyspark代码

df = df.groupby("CAPTUREDTIME", "NODE", "CHANNEL", "LOCATION", "TACK").agg(
    func.count("TACK").alias("count")
)

如何扩展上述代码以在 'hourly'、'daily'、'weekly'、'monthly' 上分组?

我需要以下格式的输出(共享示例输出):

每小时:

|拍摄时间|节点|频道 |地点|大头钉|计数

|20-05-0903:00:00| PUSC_RES|单纯形| NORTH_AL| UE220034| 2

|20-05-0904:00:00| PUSC_RES|单纯形| SOUTH_AL| UE220034| 2

每日:

|拍摄时间|节点|频道 |地点|大头钉|计数

|20-05-0900:00:00| PUSC_RES|单纯形| NORTH_AL| UE220034| 1

|20-05-0900:00:00| PUSC_RES|单纯形| SOUTH_AL| UE220034| 2

|20-05-0900:00:00| TESC_RES|单纯形| NORTH_AL| UE220057| 3

每周:

|拍摄时间|节点|频道 |地点|大头钉|计数

|20-05-0900:00:00| PUSC_RES|单纯形| NORTH_AL| UE220034| 1

每月:

|拍摄时间|节点|频道 |地点|大头钉|计数

|20-05-0900:00:00| PUSC_RES|单纯形| NORTH_AL| UE220034| 1

Spark 为日期操作提供了一个相对丰富的库。您的问题的答案是提取日期部分和显示日期格式的组合。

我重新创建了你的数据如下:

val capturesRaw = spark.read
  .option("ignoreLeadingWhiteSpace", "true")
  .option("ignoreTrailingWhiteSpace", "true")
  .option("delimiter", "|")
  .option("header", "true")
  .csv(spark.sparkContext.parallelize("""
      CAPTUREDTIME|      NODE|       CHANNEL  |  LOCATION|    TACK
      20-05-09 03:06:21|   PUSC_RES|   SIMPLEX|  NORTH_AL|    UE220034
      20-05-09 04:33:04|   PUSC_RES|   SIMPLEX|  SOUTH_AL|    UE220034
      20-05-09 12:04:52|   TESC_RES|   SIMPLEX|  NORTH_AL|    UE220057
      20-05-10 04:24:09|   TESC_RES|   SIMPLEX|  NORTH_AL|    UE220057
      20-05-10 04:33:04|   PUSC_RES|   SIMPLEX|  SOUTH_AL|    UE220034
      20-04-09 10:57:48|   TESC_RES|   SIMPLEX|  NORTH_AL|    UE220057
      20-04-09 12:12:26|   TESC_RES|   SIMPLEX|  NORTH_AL|    UE220057
      20-04-09 03:26:33|   PUSC_RES|   SIMPLEX|  NORTH_AL|    UE220071"""
  .split("\n")).toDS)

注意:我用的是Scala,但是代码差别很小希望你能看懂。我相信开头的 val 实际上是唯一的区别。

我假设前两位数字代表两位数的年份?要继续,我们需要确保 capturedtime 是一个时间戳。我更喜欢使用 SQL 来操作数据帧,因为我发现它更具可读性。

spark.sql("""select to_timestamp('20' || capturedtime) capturedtime, NODE, CHANNEL, 
             LOCATION, TACK from captures_raw""")
    .createOrReplaceTempView("captures_raw")

如果您愿意,可以直接在数据帧上完成同样的事情

capturesRaw.withColumn("capturedtimestamp",
        to_timestamp(col("capturedtime"), "yy-MM-dd hh:mm:ss"))

此时,我们可以创建您请求的字段:

spark.sql("""select capturedtime,
           month(capturedtime) cap_month, 
           weekofyear(capturedtime) cap_week,   
           day(capturedtime) cap_day, 
           hour(capturedtime) cap_hr, NODE, CHANNEL, LOCATION, TACK 
           from captures_raw""").createOrReplaceTempView("captures")    

创建字段后,我们就可以回答您的问题了。例如,要单独按月汇总(不包括其余时间戳),请按以下步骤操作:

spark.sql("""select date_format(capturedtime, "yyyy-MM") year_month, cap_month,
             cap_week, cap_day, cap_hr, count(*) count
             from captures 
             group by 1,2,3,4,5""").show

哪个returns

+----------+---------+--------+-------+------+-----+
|year_month|cap_month|cap_week|cap_day|cap_hr|count|
+----------+---------+--------+-------+------+-----+
|   2020-04|        4|      15|      9|     3|    1|
|   2020-04|        4|      15|      9|    10|    1|
|   2020-05|        5|      19|      9|     4|    1|
|   2020-05|        5|      19|      9|    12|    1|
|   2020-04|        4|      15|      9|    12|    1|
|   2020-05|        5|      19|      9|     3|    1|
|   2020-05|        5|      19|     10|     4|    2|
+----------+---------+--------+-------+------+-----+

每天的总结可以这样产生:

spark.sql("""select date_format(capturedtime, "yyyy-MM-dd") captured_date,
             cap_day, cap_hr, count(*) count
             from captures 
             group by 1,2,3""").show

+-------------+-------+------+-----+
|captured_date|cap_day|cap_hr|count|
+-------------+-------+------+-----+
|   2020-05-10|     10|     4|    2|
|   2020-04-09|      9|    12|    1|
|   2020-05-09|      9|     4|    1|
|   2020-05-09|      9|    12|    1|
|   2020-04-09|      9|     3|    1|
|   2020-04-09|      9|    10|    1|
|   2020-05-09|      9|     3|    1|
+-------------+-------+------+-----+

您有两种方法来回答您的问题,要么将时间戳转换为您想要分组的日期粒度,要么(如您在评论中所说)使用 sql window 函数按您喜欢的时间间隔分组。

只知道不能通过 Spark 中的 window SQL 函数进行每月汇总。

在这里你可以看到代码,前三个示例使用 window SQL 函数,最后一个示例按月转换时间戳,然后按每一列分组。

df = spark.createDataFrame(
    [
        ("20-05-09 03:06:21", "PUSC_RES", "SIMPLEX", "NORTH_AL", "UE220034"),
        ("20-05-09 04:33:04", "PUSC_RES", "SIMPLEX", "SOUTH_AL", "UE220034"),
        ("20-05-09 12:04:52", "TESC_RES", "SIMPLEX", "NORTH_AL", "UE220057"),
        ("20-05-10 04:24:09", "TESC_RES", "SIMPLEX", "NORTH_AL", "UE220057"),
        ("20-05-10 04:33:04", "PUSC_RES", "SIMPLEX", "SOUTH_AL", "UE220034"),
        ("20-04-09 10:57:48", "TESC_RES", "SIMPLEX", "NORTH_AL", "UE220057"),
        ("20-04-09 12:12:26", "TESC_RES", "SIMPLEX", "NORTH_AL", "UE220057"),
        ("20-04-09 03:26:33", "PUSC_RES", "SIMPLEX", "NORTH_AL", "UE220071")
    ],
    ['CAPTUREDTIME', 'NODE', 'CHANNEL', 'LOCATION', 'TACK']
)

from pyspark.sql.functions import col, count, date_format, date_sub, date_trunc, month, next_day, to_timestamp, weekofyear, window, year

每小时

这个我还是保持window的逻辑,这样大家可以参考一下Spark中的各种可能性。在显示数据帧之前,我只 select 最后 window 的开头。

hourly = (
    df
    .withColumn("captured_time", to_timestamp(col('CAPTUREDTIME'), 'yy-MM-dd HH:mm:ss'))
    .groupBy(window(col("captured_time"), "1 hour").alias("captured_time"), "NODE", "CHANNEL", "LOCATION", "TACK")
    .agg(count("*"))
    .withColumn("captured_time_hour", col("captured_time.start"))
    .drop("captured_time")
)
hourly.sort("captured_time_hour").show(100, False)

每天

通过date_trunc函数,我可以只考虑日期来截断时间戳

daily = (
    df
    .withColumn("captured_time", to_timestamp(col('CAPTUREDTIME'), 'yy-MM-dd HH:mm:ss'))
    .withColumn("captured_time_day", date_trunc("day", col("captured_time")))
    .groupBy("captured_time_day", "NODE", "CHANNEL", "LOCATION", "TACK")
    .agg(count("*"))
)
daily.sort("captured_time_day").show(100, False)

每周

这个有点棘手。首先,我使用星期一的 next_day 函数。如果您将周日视为一周的开始,请根据它更新此代码,但我将周一视为一周的开始(这取决于 SQL 我相信的方言和地区)

然后我们还可以添加一个weekofyear函数来根据需要检索周数

weekly = (
    df
    .withColumn("captured_time", to_timestamp(col('CAPTUREDTIME'), 'yy-MM-dd HH:mm:ss'))
    .withColumn("start_day", date_sub(next_day(col("captured_time"), "monday"), 7))
    .groupBy("start_day", "NODE", "CHANNEL", "LOCATION", "TACK")
    .agg(count("*"))
    .withColumn("start_day", to_timestamp(col("start_day")))
    .withColumn("week_of_year", weekofyear(col("start_day")))
)
weekly.sort("start_day").show(100, False)

每月

我们只是将时间戳格式化为日期,然后将其转换回时间戳。这样做只是为了展示另一种方法。我们可以将时间戳截断为日常用例。我还展示了两种提取月份名称和缩写的方法。请注意您的 Spark 版本,因为它已在 Spark 3.0.0

中进行测试
monthly = (
    df
    .withColumn("captured_time", to_timestamp(col('CAPTUREDTIME'), 'yy-MM-dd HH:mm:ss'))
    .withColumn("captured_time_month", date_format(col('captured_time'), '1/M/yyyy'))
    .groupBy(col("captured_time_month"), "NODE", "CHANNEL", "LOCATION", "TACK")
    .agg(count("*").alias("Count TACK"))
    .withColumn("captured_time_month", to_timestamp(col("captured_time_month"), '1/M/yyyy'))
    .withColumn("month", month(col("captured_time_month")))
    .withColumn("month_abbr", date_format(col("captured_time_month"),'MMM'))
    .withColumn("full_month_name", date_format(col("captured_time_month"),'MMMM'))
)
monthly.sort("captured_time_month").show(100, False)

再见!