在 pyspark 中创建带有时间戳和 groupby 的 KPI
Create a KPI with a timestamp and a groupby in pyspark
我有一个包含日志的数据框,就像这个例子一样:
+------------+--------------------------+--------------------+-------------------+
|Source |Error | @timestamp| timestamp_rounded |
+------------+--------------------------+--------------------+-------------------+
| A | No |2021-09-12T14:07:...|2021-09-12 16:10:00|
| B | No |2021-09-12T12:49:...|2021-09-12 14:50:00|
| C | No |2021-09-12T12:59:...|2021-09-12 15:00:00|
| C | No |2021-09-12T12:58:...|2021-09-12 15:00:00|
| B | No |2021-09-12T14:22:...|2021-09-12 16:20:00|
| A | Yes |2021-09-12T14:22:...|2021-09-12 16:25:00|
| B | No |2021-09-12T13:00:...|2021-09-12 15:00:00|
| B | No |2021-09-12T12:57:...|2021-09-12 14:55:00|
| B | No |2021-09-12T12:57:...|2021-09-12 15:00:00|
| B | No |2021-09-12T12:58:...|2021-09-12 15:00:00|
| C | No |2021-09-12T12:54:...|2021-09-12 14:55:00|
| A | Yes |2021-09-12T14:17:...|2021-09-12 16:15:00|
| B | No |2021-09-12T12:43:...|2021-09-12 14:45:00|
| A | No |2021-09-12T12:45:...|2021-09-12 14:45:00|
| D | No |2021-09-12T12:57:...|2021-09-12 14:55:00|
| A | No |2021-09-12T13:00:...|2021-09-12 15:00:00|
| C | No |2021-09-12T12:47:...|2021-09-12 14:45:00|
| A | No |2021-09-12T12:57:...|2021-09-12 15:00:00|
| A | No |2021-09-12T13:00:...|2021-09-12 15:00:00|
| A | No |2021-09-12T14:23:...|2021-09-12 16:25:00|
+------------+--------------------------+--------------------+-------------------+
only showing top 20 rows
我的数据框有数百万条日志,这并不重要。
我想计算每个源的错误率,每 5 分钟。我已经搜索了有关此类转换的文档(groupby with partition?double groupby?...)但我没有找到很多信息。
我可以使用 Yes ==> 1 和 No ==> 0 获得一个新列,然后使用 gorupby
和 {avg: foo}
获得每个来源的平均值以获得每个来源的错误率来源,但我希望它每 5 分钟一次(参见 col 'timestamp_rounded')
结果如下:
+-------------------+------------+--------------+-------------+------------+
|timestamp_rounded |Error_rate_A| Error_rate_B | Error_rate_C|Error_rate_D|
+-------------------+------------+--------------+-------------+------------+
|2021-09-12 16:10:00| 0 | 0.2 | 0 | 0.2 |
|2021-09-12 16:15:00| 0.1 | 0.3 | 0 | 0 |
|2021-09-12 16:20:00| 0 | 0.2 | 0 | 0 |
|2021-09-12 16:25:00| 0 | 0.2 | 0 | 0 |
|2021-09-12 16:30:00| 0 | 0.2 | 0 | 0 |
|2021-09-12 16:35:00| 0.2 | 0.2 | 0 | 0 |
|2021-09-12 16:40:00| 0.3 | 0.2 | 0 | 0.2 |
|2021-09-12 16:45:00| 0.4 | 0.3 | 0 | 0 |
etc...
来源可以非常多(我的示例有 4 个,但可以有数千个来源)
如果您需要更多信息,请告诉我。
非常感谢!
假设您的数据可以在名为 logs
的数据框中访问,您可以通过在 timestamp_rounded
上进行初始分组然后在 source
上进行调整以将汇总的错误率转换为每个 timestamp_rounded
错误率的行和列的每个 source
。最后,您可以用 0.0
替换缺失的错误率值
在执行这些转换之前,我们可以将您的 Yes
/No
值转换为 1
/0
以简化 aggregation/mean 并重命名 source
带有前缀 Error_rate_
的列值,以在数据透视后获得所需的列名称。
NB.我在问题的样本数据中更改了你的1条记录
| A | No |2021-09-12T12:57:...|2021-09-12 15:00:00|
至
| A | Yes |2021-09-12T12:57:...|2021-09-12 15:00:00|
接收更多数据变化。因此,您的数据框在初始聚合后看起来像这样。
您可以使用以下方法实现此目的:
output_df =(
logs.withColumn("Error",F.when(F.col("Error")=="Yes",1).otherwise(0))
.withColumn("Source",F.concat(F.lit("Error_rate_"),F.col("Source")))
.groupBy("timestamp_rounded")
.pivot("Source")
.agg(
F.round(F.mean("Error"),2).alias("Error_rate")
)
.na.fill(0.0)
)
输出
+-------------------+------------+------------+------------+------------+
|timestamp_rounded |Error_rate_A|Error_rate_B|Error_rate_C|Error_rate_D|
+-------------------+------------+------------+------------+------------+
|2021-09-12 14:50:00|0.0 |0.0 |0.0 |0.0 |
|2021-09-12 16:15:00|1.0 |0.0 |0.0 |0.0 |
|2021-09-12 16:20:00|0.0 |0.0 |0.0 |0.0 |
|2021-09-12 16:25:00|0.5 |0.0 |0.0 |0.0 |
|2021-09-12 14:55:00|0.0 |0.0 |0.0 |0.0 |
|2021-09-12 14:45:00|0.0 |0.0 |0.0 |0.0 |
|2021-09-12 16:10:00|0.0 |0.0 |0.0 |0.0 |
|2021-09-12 15:00:00|0.33 |0.0 |0.0 |0.0 |
+-------------------+------------+------------+------------+------------+
NB. 上面的输出未排序,可以使用 .orderBy
轻松排序
让我知道这是否适合你。
我有一个包含日志的数据框,就像这个例子一样:
+------------+--------------------------+--------------------+-------------------+
|Source |Error | @timestamp| timestamp_rounded |
+------------+--------------------------+--------------------+-------------------+
| A | No |2021-09-12T14:07:...|2021-09-12 16:10:00|
| B | No |2021-09-12T12:49:...|2021-09-12 14:50:00|
| C | No |2021-09-12T12:59:...|2021-09-12 15:00:00|
| C | No |2021-09-12T12:58:...|2021-09-12 15:00:00|
| B | No |2021-09-12T14:22:...|2021-09-12 16:20:00|
| A | Yes |2021-09-12T14:22:...|2021-09-12 16:25:00|
| B | No |2021-09-12T13:00:...|2021-09-12 15:00:00|
| B | No |2021-09-12T12:57:...|2021-09-12 14:55:00|
| B | No |2021-09-12T12:57:...|2021-09-12 15:00:00|
| B | No |2021-09-12T12:58:...|2021-09-12 15:00:00|
| C | No |2021-09-12T12:54:...|2021-09-12 14:55:00|
| A | Yes |2021-09-12T14:17:...|2021-09-12 16:15:00|
| B | No |2021-09-12T12:43:...|2021-09-12 14:45:00|
| A | No |2021-09-12T12:45:...|2021-09-12 14:45:00|
| D | No |2021-09-12T12:57:...|2021-09-12 14:55:00|
| A | No |2021-09-12T13:00:...|2021-09-12 15:00:00|
| C | No |2021-09-12T12:47:...|2021-09-12 14:45:00|
| A | No |2021-09-12T12:57:...|2021-09-12 15:00:00|
| A | No |2021-09-12T13:00:...|2021-09-12 15:00:00|
| A | No |2021-09-12T14:23:...|2021-09-12 16:25:00|
+------------+--------------------------+--------------------+-------------------+
only showing top 20 rows
我的数据框有数百万条日志,这并不重要。
我想计算每个源的错误率,每 5 分钟。我已经搜索了有关此类转换的文档(groupby with partition?double groupby?...)但我没有找到很多信息。
我可以使用 Yes ==> 1 和 No ==> 0 获得一个新列,然后使用 gorupby
和 {avg: foo}
获得每个来源的平均值以获得每个来源的错误率来源,但我希望它每 5 分钟一次(参见 col 'timestamp_rounded')
结果如下:
+-------------------+------------+--------------+-------------+------------+
|timestamp_rounded |Error_rate_A| Error_rate_B | Error_rate_C|Error_rate_D|
+-------------------+------------+--------------+-------------+------------+
|2021-09-12 16:10:00| 0 | 0.2 | 0 | 0.2 |
|2021-09-12 16:15:00| 0.1 | 0.3 | 0 | 0 |
|2021-09-12 16:20:00| 0 | 0.2 | 0 | 0 |
|2021-09-12 16:25:00| 0 | 0.2 | 0 | 0 |
|2021-09-12 16:30:00| 0 | 0.2 | 0 | 0 |
|2021-09-12 16:35:00| 0.2 | 0.2 | 0 | 0 |
|2021-09-12 16:40:00| 0.3 | 0.2 | 0 | 0.2 |
|2021-09-12 16:45:00| 0.4 | 0.3 | 0 | 0 |
etc...
来源可以非常多(我的示例有 4 个,但可以有数千个来源)
如果您需要更多信息,请告诉我。 非常感谢!
假设您的数据可以在名为 logs
的数据框中访问,您可以通过在 timestamp_rounded
上进行初始分组然后在 source
上进行调整以将汇总的错误率转换为每个 timestamp_rounded
错误率的行和列的每个 source
。最后,您可以用 0.0
在执行这些转换之前,我们可以将您的 Yes
/No
值转换为 1
/0
以简化 aggregation/mean 并重命名 source
带有前缀 Error_rate_
的列值,以在数据透视后获得所需的列名称。
NB.我在问题的样本数据中更改了你的1条记录
| A | No |2021-09-12T12:57:...|2021-09-12 15:00:00|
至
| A | Yes |2021-09-12T12:57:...|2021-09-12 15:00:00|
接收更多数据变化。因此,您的数据框在初始聚合后看起来像这样。
您可以使用以下方法实现此目的:
output_df =(
logs.withColumn("Error",F.when(F.col("Error")=="Yes",1).otherwise(0))
.withColumn("Source",F.concat(F.lit("Error_rate_"),F.col("Source")))
.groupBy("timestamp_rounded")
.pivot("Source")
.agg(
F.round(F.mean("Error"),2).alias("Error_rate")
)
.na.fill(0.0)
)
输出
+-------------------+------------+------------+------------+------------+
|timestamp_rounded |Error_rate_A|Error_rate_B|Error_rate_C|Error_rate_D|
+-------------------+------------+------------+------------+------------+
|2021-09-12 14:50:00|0.0 |0.0 |0.0 |0.0 |
|2021-09-12 16:15:00|1.0 |0.0 |0.0 |0.0 |
|2021-09-12 16:20:00|0.0 |0.0 |0.0 |0.0 |
|2021-09-12 16:25:00|0.5 |0.0 |0.0 |0.0 |
|2021-09-12 14:55:00|0.0 |0.0 |0.0 |0.0 |
|2021-09-12 14:45:00|0.0 |0.0 |0.0 |0.0 |
|2021-09-12 16:10:00|0.0 |0.0 |0.0 |0.0 |
|2021-09-12 15:00:00|0.33 |0.0 |0.0 |0.0 |
+-------------------+------------+------------+------------+------------+
NB. 上面的输出未排序,可以使用 .orderBy
让我知道这是否适合你。