如何根据pyspark中的行和列条件过滤多行
How to filter multiple rows based on rows and columns condition in pyspark
我想根据“值”列过滤多行。例如,我想从 channel_name
列中筛选 velocity
,其中 value>=1 & value <=5
,我想从 channel_name
列中筛选 Temp
,其中 value>=0 & value <=2
。下面是我的 Pysaprk DF.
start_timestamp
channel_name
value
2020-11-02 08:51:50
velocity
1
2020-11-02 09:14:29
Temp
0
2020-11-02 09:18:32
velocity
0
2020-11-02 09:32:42
velocity
4
2020-11-03 13:06:03
Temp
2
2020-11-03 13:10:01
Temp
1
2020-11-03 13:54:38
Temp
5
2020-11-03 14:46:25
velocity
5
2020-11-03 14:57:31
Kilometer
6
2020-11-03 15:07:07
Kilometer
7
预期的 DF:
start_timestamp
channel_name
value
2020-11-02 08:51:50
velocity
1
2020-11-02 09:32:42
velocity
4
2020-11-03 14:46:25
velocity
5
2020-11-02 09:14:29
Temp
0
2020-11-03 13:06:03
Temp
2
2020-11-03 13:10:01
Temp
1
我尝试了 channel_name Velocity
,它工作正常。
df1=df.filter((df.channel_name == "velocity") & (df.interpreted_value >= 1 ) & (df.interpreted_value <= 5))
但我不知道如何为多个 channel_name
执行此操作,例如 Velocity
和 Temp
:下面是代码,也让我知道这是正确的方法还是我该怎么做。
df1=df.filter(((df.channel_name == "velocity") & (df.interpreted_value >= 1 ) &
(df.interpreted_value <= 5))) &
((df.channel_name == "Temp") & (df.interpreted_value >= 0 ) &
(df.interpreted_value <= 2))))
您需要使用 or (|
) 而不是 and(&
) 运算符当您组合子句时:
import pyspark.sql.functions as F
import pyspark.sql.types as T
df = spark.createDataFrame([
("2020-11-02 08:51:50", "velocity", 1),
("2020-11-02 09:14:29", "Temp", 0),
("2020-11-02 09:18:32", "velocity", 0),
("2020-11-02 09:32:42", "velocity", 4),
("2020-11-03 13:06:03", "Temp", 2),
("2020-11-03 13:10:01", "Temp", 1),
("2020-11-03 13:54:38", "Temp", 5),
("2020-11-03 14:46:25", "velocity", 5),
("2020-11-03 14:57:31", "Kilometer",6),
("2020-11-03 15:07:07", "Kilometer", 7)],
["start_timestamp", "channel_name", "value"]).withColumn("start_timestamp", F.to_timestamp("start_timestamp"))
df_filtered = df.filter((((df.channel_name == "velocity") & (df.value >= 1 ) &
(df.value <= 5))) | # or instead of and
((df.channel_name == "Temp") & (df.value >= 0 ) &
(df.value <= 2)))
df_filtered.show()
输出:
+-------------------+------------+-----+
| start_timestamp|channel_name|value|
+-------------------+------------+-----+
|2020-11-02 08:51:50| velocity| 1|
|2020-11-02 09:14:29| Temp| 0|
|2020-11-02 09:32:42| velocity| 4|
|2020-11-03 13:06:03| Temp| 2|
|2020-11-03 13:10:01| Temp| 1|
|2020-11-03 14:46:25| velocity| 5|
+-------------------+------------+-----+
您当前应用的过滤器将 return 什么都没有,因为您首先检查频道名称是否等于一个特定字符串,然后再检查它是否等于另一个特定字符串。对于 or,只有一个子句应该为真以将行包含在结果数据框中。
我想根据“值”列过滤多行。例如,我想从 channel_name
列中筛选 velocity
,其中 value>=1 & value <=5
,我想从 channel_name
列中筛选 Temp
,其中 value>=0 & value <=2
。下面是我的 Pysaprk DF.
start_timestamp | channel_name | value |
---|---|---|
2020-11-02 08:51:50 | velocity | 1 |
2020-11-02 09:14:29 | Temp | 0 |
2020-11-02 09:18:32 | velocity | 0 |
2020-11-02 09:32:42 | velocity | 4 |
2020-11-03 13:06:03 | Temp | 2 |
2020-11-03 13:10:01 | Temp | 1 |
2020-11-03 13:54:38 | Temp | 5 |
2020-11-03 14:46:25 | velocity | 5 |
2020-11-03 14:57:31 | Kilometer | 6 |
2020-11-03 15:07:07 | Kilometer | 7 |
预期的 DF:
start_timestamp | channel_name | value |
---|---|---|
2020-11-02 08:51:50 | velocity | 1 |
2020-11-02 09:32:42 | velocity | 4 |
2020-11-03 14:46:25 | velocity | 5 |
2020-11-02 09:14:29 | Temp | 0 |
2020-11-03 13:06:03 | Temp | 2 |
2020-11-03 13:10:01 | Temp | 1 |
我尝试了 channel_name Velocity
,它工作正常。
df1=df.filter((df.channel_name == "velocity") & (df.interpreted_value >= 1 ) & (df.interpreted_value <= 5))
但我不知道如何为多个 channel_name
执行此操作,例如 Velocity
和 Temp
:下面是代码,也让我知道这是正确的方法还是我该怎么做。
df1=df.filter(((df.channel_name == "velocity") & (df.interpreted_value >= 1 ) &
(df.interpreted_value <= 5))) &
((df.channel_name == "Temp") & (df.interpreted_value >= 0 ) &
(df.interpreted_value <= 2))))
您需要使用 or (|
) 而不是 and(&
) 运算符当您组合子句时:
import pyspark.sql.functions as F
import pyspark.sql.types as T
df = spark.createDataFrame([
("2020-11-02 08:51:50", "velocity", 1),
("2020-11-02 09:14:29", "Temp", 0),
("2020-11-02 09:18:32", "velocity", 0),
("2020-11-02 09:32:42", "velocity", 4),
("2020-11-03 13:06:03", "Temp", 2),
("2020-11-03 13:10:01", "Temp", 1),
("2020-11-03 13:54:38", "Temp", 5),
("2020-11-03 14:46:25", "velocity", 5),
("2020-11-03 14:57:31", "Kilometer",6),
("2020-11-03 15:07:07", "Kilometer", 7)],
["start_timestamp", "channel_name", "value"]).withColumn("start_timestamp", F.to_timestamp("start_timestamp"))
df_filtered = df.filter((((df.channel_name == "velocity") & (df.value >= 1 ) &
(df.value <= 5))) | # or instead of and
((df.channel_name == "Temp") & (df.value >= 0 ) &
(df.value <= 2)))
df_filtered.show()
输出:
+-------------------+------------+-----+
| start_timestamp|channel_name|value|
+-------------------+------------+-----+
|2020-11-02 08:51:50| velocity| 1|
|2020-11-02 09:14:29| Temp| 0|
|2020-11-02 09:32:42| velocity| 4|
|2020-11-03 13:06:03| Temp| 2|
|2020-11-03 13:10:01| Temp| 1|
|2020-11-03 14:46:25| velocity| 5|
+-------------------+------------+-----+
您当前应用的过滤器将 return 什么都没有,因为您首先检查频道名称是否等于一个特定字符串,然后再检查它是否等于另一个特定字符串。对于 or,只有一个子句应该为真以将行包含在结果数据框中。