如何在 Pyspark 中使用 groupby 删除条件中的列
How to drop column in a condition with groupby in Pyspark
我有一个名为 df 的数据框,其中包含以下内容:
accountname | clustername | namespace | cost
account1 | cluster_1_1 | ns_1_1 | 10
account1 | cluster_1_1 | ns_1_2 | 11
account1 | cluster_1_1 | infra | 12
account1 | cluster_1_2 | infra | 12
account2 | cluster_2_1 | infra | 13
account3 | cluster_3_1 | ns_3_1 | 10
account3 | cluster_3_1 | ns_3_2 | 11
account3 | cluster_3_1 | infra | 12
df 在 accountname 字段的 groupby 中,我需要按每个 accountname 中的 clustername 字段进行过滤,执行以下操作:
当 clustername 中的行对每个帐户名有超过 1 个条目时,删除 namespace = infra 的行,如果 clustername 中的行在其帐户名中只有一行,则保留此行,如下所示:
accountname | clustername | namespace | cost
account1 | cluster_1_1 | ns_1_1 | 10
account1 | cluster_1_1 | ns_1_2 | 11
account1 | cluster_1_2 | infra | 12
account2 | cluster_2_1 | infra | 13
account3 | cluster_3_1 | ns_3_1 | 10
account3 | cluster_3_1 | ns_3_2 | 11
由于 cluster_1_1 有不止一行,并且在命名空间中有值“infra”,所以该行被删除了。
但是在 cluster_1_2 和 cluster_2_1 的情况下,因为它们只有一行,所以它被保留了下来。
我的代码是这样的:
from pyspark.sql import SparkSession
from pyspark.sql import *
from pyspark.sql.functions import *
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
fields = Row('accountname','clustername','namespace','cost')
s1 = fields("account1","cluster_1_1","ns_1_1",10)
s2 = fields("account1","cluster_1_1","ns_1_2",11)
s3 = fields("account1","cluster_1_1","infra",12)
s4 = fields("account1","cluster_1_2","infra",12)
s5 = fields("account2","cluster_2_1","infra",13)
s6 = fields("account3","cluster_3_1","ns_3_1",10)
s7 = fields("account3","cluster_3_1","ns_3_2",11)
s8 = fields("account3","cluster_3_1","infra",12)
fieldsData=[s1,s2,s3,s4,s5,s6,s7,s8]
df=spark.createDataFrame(fieldsData)
df.show()
提前致谢。
检查一下,您可以先使用 window 函数按 accountname &clustername 分区计算 clustername 的计数,然后对计数大于 1 且 namespace=infra[=11= 的行使用过滤器的取反]
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w= Window.partitionBy("accountname", "clustername")
df.withColumn("count", F.count("clustername").over(w))\
.filter(~((F.col("count")>1)&(F.col("namespace")=='infra')))\
.drop("count").orderBy(F.col("accountname")).show()
+-----------+-----------+---------+----+
|accountname|clustername|namespace|cost|
+-----------+-----------+---------+----+
| account1|cluster_1_1| ns_1_1| 10|
| account1|cluster_1_1| ns_1_2| 11|
| account1|cluster_1_2| infra| 12|
| account2|cluster_2_1| infra| 13|
| account3|cluster_3_1| ns_3_1| 10|
| account3|cluster_3_1| ns_3_2| 11|
+-----------+-----------+---------+----+
我有一个名为 df 的数据框,其中包含以下内容:
accountname | clustername | namespace | cost
account1 | cluster_1_1 | ns_1_1 | 10
account1 | cluster_1_1 | ns_1_2 | 11
account1 | cluster_1_1 | infra | 12
account1 | cluster_1_2 | infra | 12
account2 | cluster_2_1 | infra | 13
account3 | cluster_3_1 | ns_3_1 | 10
account3 | cluster_3_1 | ns_3_2 | 11
account3 | cluster_3_1 | infra | 12
df 在 accountname 字段的 groupby 中,我需要按每个 accountname 中的 clustername 字段进行过滤,执行以下操作: 当 clustername 中的行对每个帐户名有超过 1 个条目时,删除 namespace = infra 的行,如果 clustername 中的行在其帐户名中只有一行,则保留此行,如下所示:
accountname | clustername | namespace | cost
account1 | cluster_1_1 | ns_1_1 | 10
account1 | cluster_1_1 | ns_1_2 | 11
account1 | cluster_1_2 | infra | 12
account2 | cluster_2_1 | infra | 13
account3 | cluster_3_1 | ns_3_1 | 10
account3 | cluster_3_1 | ns_3_2 | 11
由于 cluster_1_1 有不止一行,并且在命名空间中有值“infra”,所以该行被删除了。 但是在 cluster_1_2 和 cluster_2_1 的情况下,因为它们只有一行,所以它被保留了下来。 我的代码是这样的:
from pyspark.sql import SparkSession
from pyspark.sql import *
from pyspark.sql.functions import *
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
fields = Row('accountname','clustername','namespace','cost')
s1 = fields("account1","cluster_1_1","ns_1_1",10)
s2 = fields("account1","cluster_1_1","ns_1_2",11)
s3 = fields("account1","cluster_1_1","infra",12)
s4 = fields("account1","cluster_1_2","infra",12)
s5 = fields("account2","cluster_2_1","infra",13)
s6 = fields("account3","cluster_3_1","ns_3_1",10)
s7 = fields("account3","cluster_3_1","ns_3_2",11)
s8 = fields("account3","cluster_3_1","infra",12)
fieldsData=[s1,s2,s3,s4,s5,s6,s7,s8]
df=spark.createDataFrame(fieldsData)
df.show()
提前致谢。
检查一下,您可以先使用 window 函数按 accountname &clustername 分区计算 clustername 的计数,然后对计数大于 1 且 namespace=infra[=11= 的行使用过滤器的取反]
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w= Window.partitionBy("accountname", "clustername")
df.withColumn("count", F.count("clustername").over(w))\
.filter(~((F.col("count")>1)&(F.col("namespace")=='infra')))\
.drop("count").orderBy(F.col("accountname")).show()
+-----------+-----------+---------+----+
|accountname|clustername|namespace|cost|
+-----------+-----------+---------+----+
| account1|cluster_1_1| ns_1_1| 10|
| account1|cluster_1_1| ns_1_2| 11|
| account1|cluster_1_2| infra| 12|
| account2|cluster_2_1| infra| 13|
| account3|cluster_3_1| ns_3_1| 10|
| account3|cluster_3_1| ns_3_2| 11|
+-----------+-----------+---------+----+