根据另一列 Spark Scala 中的时间戳过滤行
Filter rows based on a time stamp in another column Spark Scala
假设我在 Spark Scala 中有以下数据框:
+--------+--------------------+--------------------+
|Index | Date| Date_x|
+--------+--------------------+--------------------+
| 1|2018-01-31T20:33:...|2018-01-31T21:18:...|
| 1|2018-01-31T20:35:...|2018-01-31T21:18:...|
| 1|2018-01-31T21:04:...|2018-01-31T21:18:...|
| 1|2018-01-31T21:05:...|2018-01-31T21:18:...|
| 1|2018-01-31T21:15:...|2018-01-31T21:18:...|
| 1|2018-01-31T21:16:...|2018-01-31T21:18:...|
| 1|2018-01-31T21:19:...|2018-01-31T21:18:...|
| 1|2018-01-31T21:20:...|2018-01-31T21:18:...|
| 2|2018-01-31T19:43:...|2018-01-31T20:35:...|
| 2|2018-01-31T19:44:...|2018-01-31T20:35:...|
| 2|2018-01-31T20:36:...|2018-01-31T20:35:...|
+--------+--------------------+--------------------+
我想为每个索引删除 Date < Date_x
的行,如下图所示:
+--------+--------------------+--------------------+
|Index | Date| Date_x|
+--------+--------------------+--------------------+
| 1|2018-01-31T21:19:...|2018-01-31T21:18:...|
| 1|2018-01-31T21:20:...|2018-01-31T21:18:...|
| 2|2018-01-31T20:36:...|2018-01-31T20:35:...|
+--------+--------------------+--------------------+
我尝试使用 monotonically_increasing_id()
添加一列 x_idx
,并为每个 Index
(其中 Date < Date_x
)获取 min(x_idx)
。这样我就可以随后从不满足条件的数据框中删除行。但这似乎对我不起作用。我可能想念对 agg()
工作原理的理解。感谢您的帮助!
val test_df = df.withColumn("x_idx", monotonically_increasing_id())
val newIdx = test_df
.filter($"Date" > "Date_x")
.groupBy($"Index")
.agg(min($"x_idx"))
.toDF("n_Index", "min_x_idx")
newIdx.show
+-------+--------+
|n_Index|min_x_idx|
+-------+--------+
+-------+--------+
您忘记在
中添加 $
.filter($"Date" > "Date_x")
所以正确的 filter
是
.filter($"Date" > $"Date_x")
您可以使用 alias
而不是调用 toDF
作为
val newIdx = test_df
.filter($"Date" > $"Date_x")
.groupBy($"Index".as("n_Index"))
.agg(min($"x_idx").as("min_x_idx"))
你应该得到输出
+-------+---------+
|n_Index|min_x_idx|
+-------+---------+
|1 |6 |
|2 |10 |
+-------+---------+
过滤条件可能会过滤所有记录。请检查是否在筛选记录后打印数据框,并确保您的筛选器按预期工作。
val newIdx = test_df
.filter($"Date" > $"Date_x")
.show
假设我在 Spark Scala 中有以下数据框:
+--------+--------------------+--------------------+
|Index | Date| Date_x|
+--------+--------------------+--------------------+
| 1|2018-01-31T20:33:...|2018-01-31T21:18:...|
| 1|2018-01-31T20:35:...|2018-01-31T21:18:...|
| 1|2018-01-31T21:04:...|2018-01-31T21:18:...|
| 1|2018-01-31T21:05:...|2018-01-31T21:18:...|
| 1|2018-01-31T21:15:...|2018-01-31T21:18:...|
| 1|2018-01-31T21:16:...|2018-01-31T21:18:...|
| 1|2018-01-31T21:19:...|2018-01-31T21:18:...|
| 1|2018-01-31T21:20:...|2018-01-31T21:18:...|
| 2|2018-01-31T19:43:...|2018-01-31T20:35:...|
| 2|2018-01-31T19:44:...|2018-01-31T20:35:...|
| 2|2018-01-31T20:36:...|2018-01-31T20:35:...|
+--------+--------------------+--------------------+
我想为每个索引删除 Date < Date_x
的行,如下图所示:
+--------+--------------------+--------------------+
|Index | Date| Date_x|
+--------+--------------------+--------------------+
| 1|2018-01-31T21:19:...|2018-01-31T21:18:...|
| 1|2018-01-31T21:20:...|2018-01-31T21:18:...|
| 2|2018-01-31T20:36:...|2018-01-31T20:35:...|
+--------+--------------------+--------------------+
我尝试使用 monotonically_increasing_id()
添加一列 x_idx
,并为每个 Index
(其中 Date < Date_x
)获取 min(x_idx)
。这样我就可以随后从不满足条件的数据框中删除行。但这似乎对我不起作用。我可能想念对 agg()
工作原理的理解。感谢您的帮助!
val test_df = df.withColumn("x_idx", monotonically_increasing_id())
val newIdx = test_df
.filter($"Date" > "Date_x")
.groupBy($"Index")
.agg(min($"x_idx"))
.toDF("n_Index", "min_x_idx")
newIdx.show
+-------+--------+
|n_Index|min_x_idx|
+-------+--------+
+-------+--------+
您忘记在
中添加$
.filter($"Date" > "Date_x")
所以正确的 filter
是
.filter($"Date" > $"Date_x")
您可以使用 alias
而不是调用 toDF
作为
val newIdx = test_df
.filter($"Date" > $"Date_x")
.groupBy($"Index".as("n_Index"))
.agg(min($"x_idx").as("min_x_idx"))
你应该得到输出
+-------+---------+
|n_Index|min_x_idx|
+-------+---------+
|1 |6 |
|2 |10 |
+-------+---------+
过滤条件可能会过滤所有记录。请检查是否在筛选记录后打印数据框,并确保您的筛选器按预期工作。
val newIdx = test_df
.filter($"Date" > $"Date_x")
.show