对于 groupby 中 Spark 数据帧中缺失的记录,用 Null[=10=] 填充所有列缺失信息的最佳方法是什么?
What is the best way to fill missing info on all columns with Null\0 for missing records in Spark dataframe while groupby?
假设我有以下 Spark 框架:
+--------+----------+-----------+-------------------+-------------------+
|UserName|date |NoLogPerDay|NoLogPer-1st-12-hrs|NoLogPer-2nd-12-hrs|
+--------+----------+-----------+-------------------+-------------------+
|B |2021-08-11|2 |2 |0 |
|A |2021-08-11|3 |2 |1 |
|B |2021-08-13|1 |1 |0 |
+--------+----------+-----------+-------------------+-------------------+
现在我不仅要用正确的日期来估算 date
列中缺失的日期,以便数据框保持其连续的时间序列性质和相同顺序的帧,而且还要用 [=16= 估算其他列] 或 0
(最好是 groupBy)。
我的代码如下:
import time
import datetime as dt
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, TimestampType, DateType
dict2 = [("2021-08-11 04:05:06", "A"),
("2021-08-11 04:15:06", "B"),
("2021-08-11 09:15:26", "A"),
("2021-08-11 11:04:06", "B"),
("2021-08-11 14:55:16", "A"),
("2021-08-13 04:12:11", "B"),
]
schema = StructType([
StructField("timestamp", StringType(), True), \
StructField("UserName", StringType(), True), \
])
#create a Spark dataframe
sqlCtx = SQLContext(sc)
sdf = sqlCtx.createDataFrame(data=dict2,schema=schema)
#sdf.printSchema()
#sdf.show(truncate=False)
#+-------------------+--------+
#|timestamp |UserName|
#+-------------------+--------+
#|2021-08-11 04:05:06|A |
#|2021-08-11 04:15:06|B |
#|2021-08-11 09:15:26|A |
#|2021-08-11 11:04:06|B |
#|2021-08-11 14:55:16|A |
#|2021-08-13 04:12:11|B |
#+-------------------+--------+
#Generate date and timestamp
sdf1 = sdf.withColumn('timestamp', F.to_timestamp("timestamp", "yyyy-MM-dd HH:mm:ss").cast(TimestampType())) \
.withColumn('date', F.to_date("timestamp", "yyyy-MM-dd").cast(DateType())) \
.select('timestamp', 'date', 'UserName')
#sdf1.show(truncate = False)
#+-------------------+----------+--------+
#|timestamp |date |UserName|
#+-------------------+----------+--------+
#|2021-08-11 04:05:06|2021-08-11|A |
#|2021-08-11 04:15:06|2021-08-11|B |
#|2021-08-11 09:15:26|2021-08-11|A |
#|2021-08-11 11:04:06|2021-08-11|B |
#|2021-08-11 14:55:16|2021-08-11|A |
#|2021-08-13 04:12:11|2021-08-13|B |
#+-------------------+----------+--------+
#Aggeragate records numbers for specific features (Username) for certain time-resolution PerDay(24hrs), HalfDay(2x12hrs)
df = sdf1.groupBy("UserName", "date").agg(
F.sum(F.hour("timestamp").between(0, 24).cast("int")).alias("NoLogPerDay"),
F.sum(F.hour("timestamp").between(0, 11).cast("int")).alias("NoLogPer-1st-12-hrs"),
F.sum(F.hour("timestamp").between(12, 23).cast("int")).alias("NoLogPer-2nd-12-hrs"),
).sort('date')
df.show(truncate = False)
问题是当我在 date
和 UserName
上分组时,我错过了一些用户 B
有活动但用户 A
没有活动的日期,反之亦然。因此,我有兴趣通过重新填充这些日期(不需要时间戳)并将 0
分配给这些列来反映 Spark 数据框中的这些无活动。我不确定是否可以在分组时或之前或之后执行此操作!
我已经检查了一些相关的 as well as PySpark offers window functions and inspired this answer 所以直到现在我已经尝试过这个:
# compute the list of all dates from available dates
max_date = sdf1.select(F.max('date')).first()['max(date)']
min_date = sdf1.select(F.min('date')).first()['min(date)']
print(min_date) #2021-08-11
print(max_date) #2021-08-13
#compute list of available dates based on min_date & max_date from available data
dates_list = [max_date - dt.timedelta(days=x) for x in range((max_date - min_date).days +1)]
print(dates_list)
#create a temporaray Spark dataframe for date column includng missing dates with interval 1 day
sqlCtx = SQLContext(sc)
df2 = sqlCtx.createDataFrame(data=dates_list)
#Apply leftouter join on date column
dff = df2.join(sdf1, ["date"], "leftouter")
#dff.sort('date').show(truncate = False)
#possible to use .withColumn().otherwise()
#.withColumn('date',when(col('date').isNull(),to_date(lit('01.01.1900'),'dd.MM.yyyy')).otherwise(col('date')))
#Replace 0 for null for all integer columns
dfff = dff.na.fill(value=0).sort('date')
dfff.select('date','Username', 'NoLogPerDay','NoLogPer-1st-12-hrs','NoLogPer-2nd-12-hrs').sort('date').show(truncate = False)
请注意,我对使用 UDF
或通过 toPandas()
进行破解不感兴趣
所以在 groupBy 之后预期的结果应该如下所示:
+--------+----------+-----------+-------------------+-------------------+
|UserName|date |NoLogPerDay|NoLogPer-1st-12-hrs|NoLogPer-2nd-12-hrs|
+--------+----------+-----------+-------------------+-------------------+
|B |2021-08-11|2 |2 |0 |
|A |2021-08-11|3 |2 |1 |
|B |2021-08-12|0 |0 |0 | <--
|A |2021-08-12|0 |0 |0 | <--
|B |2021-08-13|1 |1 |0 |
|A |2021-08-13|0 |0 |0 | <--
+--------+----------+-----------+-------------------+-------------------+
这是一种方法:
首先,生成新数据框 all_dates_df
,其中包含分组 df
中从最小日期到最大日期的日期序列。为此,您可以使用 sequence
函数:
import pyspark.sql.functions as F
all_dates_df = df.selectExpr(
"sequence(min(date), max(date), interval 1 day) as date"
).select(F.explode("date").alias("date"))
all_dates_df.show()
#+----------+
#| date|
#+----------+
#|2021-08-11|
#|2021-08-12|
#|2021-08-13|
#+----------+
现在,您需要使用具有不同 UserName
数据框的交叉连接为所有用户复制每个日期,最后与分组的 df
连接以获得所需的输出:
result_df = all_dates_df.crossJoin(
df.select("UserName").distinct()
).join(
df,
["UserName", "date"],
"left"
).fillna(0)
result_df.show()
#+--------+----------+-----------+-------------------+-------------------+
#|UserName| date|NoLogPerDay|NoLogPer-1st-12-hrs|NoLogPer-2nd-12-hrs|
#+--------+----------+-----------+-------------------+-------------------+
#| A|2021-08-11| 3| 2| 1|
#| B|2021-08-11| 2| 2| 0|
#| A|2021-08-12| 0| 0| 0|
#| B|2021-08-12| 0| 0| 0|
#| B|2021-08-13| 1| 1| 0|
#| A|2021-08-13| 0| 0| 0|
#+--------+----------+-----------+-------------------+-------------------+
从本质上讲,您可以生成所有可能的选项并加入其中以实现您错过的日期。
sequence sql 函数在这里可能有助于生成所有可能的日期。您可以将您的最小和最大日期以及您希望它递增的时间间隔传递给它。以下示例继续使用您的 google 协作中的代码。
使用函数min
、max
、collect_set
and table generating functions explode
您可以实现以下目标:
possible_user_dates=(
# Step 1 - Get all possible UserNames and desired dates
df.select(
F.collect_set("UserName").alias("UserName"),
F.expr("sequence(min(date),max(date), interval 1 day)").alias("date")
)
# Step 2 - Use explode to split the collected arrays into rows (ouput immediately below)
.withColumn("UserName",F.explode("UserName"))
.withColumn("date",F.explode("date"))
.distinct()
)
possible_user_dates.show(truncate=False)
+--------+----------+
|UserName|date |
+--------+----------+
|B |2021-08-11|
|A |2021-08-11|
|B |2021-08-12|
|A |2021-08-12|
|B |2021-08-13|
|A |2021-08-13|
+--------+----------+
执行左连接
final_df = (
possible_user_dates.join(
df,
["UserName","date"],
"left"
)
# Since the left join will place NULLs where values are missing.
# Eg. where a User was not active on a particular date
# We use `fill` to replace the null values with `0`
.na.fill(0)
)
final_df.show(truncate=False)
+--------+----------+-----------+-------------------+-------------------+
|UserName|date |NoLogPerDay|NoLogPer-1st-12-hrs|NoLogPer-2nd-12-hrs|
+--------+----------+-----------+-------------------+-------------------+
|B |2021-08-11|2 |2 |0 |
|A |2021-08-11|3 |2 |1 |
|B |2021-08-12|0 |0 |0 |
|A |2021-08-12|0 |0 |0 |
|B |2021-08-13|1 |1 |0 |
|A |2021-08-13|0 |0 |0 |
+--------+----------+-----------+-------------------+-------------------+
出于调试目的,我包含了一些中间步骤的输出
步骤 1 输出:
df.select(
F.collect_set("UserName").alias("UserName"),
F.expr("sequence(min(date),max(date), interval 1 day)").alias("date")
).show(truncate=False)
+--------+------------------------------------+
|UserName|date |
+--------+------------------------------------+
|[B, A] |[2021-08-11, 2021-08-12, 2021-08-13]|
+--------+------------------------------------+
假设我有以下 Spark 框架:
+--------+----------+-----------+-------------------+-------------------+
|UserName|date |NoLogPerDay|NoLogPer-1st-12-hrs|NoLogPer-2nd-12-hrs|
+--------+----------+-----------+-------------------+-------------------+
|B |2021-08-11|2 |2 |0 |
|A |2021-08-11|3 |2 |1 |
|B |2021-08-13|1 |1 |0 |
+--------+----------+-----------+-------------------+-------------------+
现在我不仅要用正确的日期来估算 date
列中缺失的日期,以便数据框保持其连续的时间序列性质和相同顺序的帧,而且还要用 [=16= 估算其他列] 或 0
(最好是 groupBy)。
我的代码如下:
import time
import datetime as dt
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, TimestampType, DateType
dict2 = [("2021-08-11 04:05:06", "A"),
("2021-08-11 04:15:06", "B"),
("2021-08-11 09:15:26", "A"),
("2021-08-11 11:04:06", "B"),
("2021-08-11 14:55:16", "A"),
("2021-08-13 04:12:11", "B"),
]
schema = StructType([
StructField("timestamp", StringType(), True), \
StructField("UserName", StringType(), True), \
])
#create a Spark dataframe
sqlCtx = SQLContext(sc)
sdf = sqlCtx.createDataFrame(data=dict2,schema=schema)
#sdf.printSchema()
#sdf.show(truncate=False)
#+-------------------+--------+
#|timestamp |UserName|
#+-------------------+--------+
#|2021-08-11 04:05:06|A |
#|2021-08-11 04:15:06|B |
#|2021-08-11 09:15:26|A |
#|2021-08-11 11:04:06|B |
#|2021-08-11 14:55:16|A |
#|2021-08-13 04:12:11|B |
#+-------------------+--------+
#Generate date and timestamp
sdf1 = sdf.withColumn('timestamp', F.to_timestamp("timestamp", "yyyy-MM-dd HH:mm:ss").cast(TimestampType())) \
.withColumn('date', F.to_date("timestamp", "yyyy-MM-dd").cast(DateType())) \
.select('timestamp', 'date', 'UserName')
#sdf1.show(truncate = False)
#+-------------------+----------+--------+
#|timestamp |date |UserName|
#+-------------------+----------+--------+
#|2021-08-11 04:05:06|2021-08-11|A |
#|2021-08-11 04:15:06|2021-08-11|B |
#|2021-08-11 09:15:26|2021-08-11|A |
#|2021-08-11 11:04:06|2021-08-11|B |
#|2021-08-11 14:55:16|2021-08-11|A |
#|2021-08-13 04:12:11|2021-08-13|B |
#+-------------------+----------+--------+
#Aggeragate records numbers for specific features (Username) for certain time-resolution PerDay(24hrs), HalfDay(2x12hrs)
df = sdf1.groupBy("UserName", "date").agg(
F.sum(F.hour("timestamp").between(0, 24).cast("int")).alias("NoLogPerDay"),
F.sum(F.hour("timestamp").between(0, 11).cast("int")).alias("NoLogPer-1st-12-hrs"),
F.sum(F.hour("timestamp").between(12, 23).cast("int")).alias("NoLogPer-2nd-12-hrs"),
).sort('date')
df.show(truncate = False)
问题是当我在 date
和 UserName
上分组时,我错过了一些用户 B
有活动但用户 A
没有活动的日期,反之亦然。因此,我有兴趣通过重新填充这些日期(不需要时间戳)并将 0
分配给这些列来反映 Spark 数据框中的这些无活动。我不确定是否可以在分组时或之前或之后执行此操作!
我已经检查了一些相关的
# compute the list of all dates from available dates
max_date = sdf1.select(F.max('date')).first()['max(date)']
min_date = sdf1.select(F.min('date')).first()['min(date)']
print(min_date) #2021-08-11
print(max_date) #2021-08-13
#compute list of available dates based on min_date & max_date from available data
dates_list = [max_date - dt.timedelta(days=x) for x in range((max_date - min_date).days +1)]
print(dates_list)
#create a temporaray Spark dataframe for date column includng missing dates with interval 1 day
sqlCtx = SQLContext(sc)
df2 = sqlCtx.createDataFrame(data=dates_list)
#Apply leftouter join on date column
dff = df2.join(sdf1, ["date"], "leftouter")
#dff.sort('date').show(truncate = False)
#possible to use .withColumn().otherwise()
#.withColumn('date',when(col('date').isNull(),to_date(lit('01.01.1900'),'dd.MM.yyyy')).otherwise(col('date')))
#Replace 0 for null for all integer columns
dfff = dff.na.fill(value=0).sort('date')
dfff.select('date','Username', 'NoLogPerDay','NoLogPer-1st-12-hrs','NoLogPer-2nd-12-hrs').sort('date').show(truncate = False)
请注意,我对使用 UDF
或通过 toPandas()
所以在 groupBy 之后预期的结果应该如下所示:
+--------+----------+-----------+-------------------+-------------------+
|UserName|date |NoLogPerDay|NoLogPer-1st-12-hrs|NoLogPer-2nd-12-hrs|
+--------+----------+-----------+-------------------+-------------------+
|B |2021-08-11|2 |2 |0 |
|A |2021-08-11|3 |2 |1 |
|B |2021-08-12|0 |0 |0 | <--
|A |2021-08-12|0 |0 |0 | <--
|B |2021-08-13|1 |1 |0 |
|A |2021-08-13|0 |0 |0 | <--
+--------+----------+-----------+-------------------+-------------------+
这是一种方法:
首先,生成新数据框 all_dates_df
,其中包含分组 df
中从最小日期到最大日期的日期序列。为此,您可以使用 sequence
函数:
import pyspark.sql.functions as F
all_dates_df = df.selectExpr(
"sequence(min(date), max(date), interval 1 day) as date"
).select(F.explode("date").alias("date"))
all_dates_df.show()
#+----------+
#| date|
#+----------+
#|2021-08-11|
#|2021-08-12|
#|2021-08-13|
#+----------+
现在,您需要使用具有不同 UserName
数据框的交叉连接为所有用户复制每个日期,最后与分组的 df
连接以获得所需的输出:
result_df = all_dates_df.crossJoin(
df.select("UserName").distinct()
).join(
df,
["UserName", "date"],
"left"
).fillna(0)
result_df.show()
#+--------+----------+-----------+-------------------+-------------------+
#|UserName| date|NoLogPerDay|NoLogPer-1st-12-hrs|NoLogPer-2nd-12-hrs|
#+--------+----------+-----------+-------------------+-------------------+
#| A|2021-08-11| 3| 2| 1|
#| B|2021-08-11| 2| 2| 0|
#| A|2021-08-12| 0| 0| 0|
#| B|2021-08-12| 0| 0| 0|
#| B|2021-08-13| 1| 1| 0|
#| A|2021-08-13| 0| 0| 0|
#+--------+----------+-----------+-------------------+-------------------+
从本质上讲,您可以生成所有可能的选项并加入其中以实现您错过的日期。
sequence sql 函数在这里可能有助于生成所有可能的日期。您可以将您的最小和最大日期以及您希望它递增的时间间隔传递给它。以下示例继续使用您的 google 协作中的代码。
使用函数min
、max
、collect_set
and table generating functions explode
您可以实现以下目标:
possible_user_dates=(
# Step 1 - Get all possible UserNames and desired dates
df.select(
F.collect_set("UserName").alias("UserName"),
F.expr("sequence(min(date),max(date), interval 1 day)").alias("date")
)
# Step 2 - Use explode to split the collected arrays into rows (ouput immediately below)
.withColumn("UserName",F.explode("UserName"))
.withColumn("date",F.explode("date"))
.distinct()
)
possible_user_dates.show(truncate=False)
+--------+----------+
|UserName|date |
+--------+----------+
|B |2021-08-11|
|A |2021-08-11|
|B |2021-08-12|
|A |2021-08-12|
|B |2021-08-13|
|A |2021-08-13|
+--------+----------+
执行左连接
final_df = (
possible_user_dates.join(
df,
["UserName","date"],
"left"
)
# Since the left join will place NULLs where values are missing.
# Eg. where a User was not active on a particular date
# We use `fill` to replace the null values with `0`
.na.fill(0)
)
final_df.show(truncate=False)
+--------+----------+-----------+-------------------+-------------------+
|UserName|date |NoLogPerDay|NoLogPer-1st-12-hrs|NoLogPer-2nd-12-hrs|
+--------+----------+-----------+-------------------+-------------------+
|B |2021-08-11|2 |2 |0 |
|A |2021-08-11|3 |2 |1 |
|B |2021-08-12|0 |0 |0 |
|A |2021-08-12|0 |0 |0 |
|B |2021-08-13|1 |1 |0 |
|A |2021-08-13|0 |0 |0 |
+--------+----------+-----------+-------------------+-------------------+
出于调试目的,我包含了一些中间步骤的输出
步骤 1 输出:
df.select(
F.collect_set("UserName").alias("UserName"),
F.expr("sequence(min(date),max(date), interval 1 day)").alias("date")
).show(truncate=False)
+--------+------------------------------------+
|UserName|date |
+--------+------------------------------------+
|[B, A] |[2021-08-11, 2021-08-12, 2021-08-13]|
+--------+------------------------------------+