如何在 Pyspark 中使用 groupby 删除条件中的列

How to drop column in a condition with groupby in Pyspark

我有一个名为 df 的数据框,其中包含以下内容:

accountname |   clustername |   namespace   |   cost
account1    |   cluster_1_1 |   ns_1_1      |   10
account1    |   cluster_1_1 |   ns_1_2      |   11
account1    |   cluster_1_1 |   infra       |   12
account1    |   cluster_1_2 |   infra       |   12
account2    |   cluster_2_1 |   infra       |   13
account3    |   cluster_3_1 |   ns_3_1      |   10
account3    |   cluster_3_1 |   ns_3_2      |   11
account3    |   cluster_3_1 |   infra       |   12

df 在 accountname 字段的 groupby 中,我需要按每个 accountname 中的 clustername 字段进行过滤,执行以下操作: 当 clustername 中的行对每个帐户名有超过 1 个条目时,删除 namespace = infra 的行,如果 clustername 中的行在其帐户名中只有一行,则保留此行,如下所示:

accountname |   clustername |   namespace   |   cost
account1    |   cluster_1_1 |   ns_1_1      |   10
account1    |   cluster_1_1 |   ns_1_2      |   11
account1    |   cluster_1_2 |   infra       |   12
account2    |   cluster_2_1 |   infra       |   13
account3    |   cluster_3_1 |   ns_3_1      |   10
account3    |   cluster_3_1 |   ns_3_2      |   11

由于 cluster_1_1 有不止一行,并且在命名空间中有值“infra”,所以该行被删除了。 但是在 cluster_1_2 和 cluster_2_1 的情况下,因为它们只有一行,所以它被保留了下来。 我的代码是这样的:

from pyspark.sql import SparkSession
from pyspark.sql import *
from pyspark.sql.functions import *

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

fields = Row('accountname','clustername','namespace','cost')
s1 = fields("account1","cluster_1_1","ns_1_1",10)
s2 = fields("account1","cluster_1_1","ns_1_2",11)
s3 = fields("account1","cluster_1_1","infra",12)
s4 = fields("account1","cluster_1_2","infra",12)
s5 = fields("account2","cluster_2_1","infra",13)
s6 = fields("account3","cluster_3_1","ns_3_1",10)
s7 = fields("account3","cluster_3_1","ns_3_2",11)
s8 = fields("account3","cluster_3_1","infra",12)

fieldsData=[s1,s2,s3,s4,s5,s6,s7,s8]
df=spark.createDataFrame(fieldsData)
df.show()

提前致谢。

检查一下,您可以先使用 window 函数按 accountname &clustername 分区计算 clustername 的计数,然后对计数大于 1 且 namespace=infra[=11= 的行使用过滤器的取反]

from pyspark.sql import functions as F
from pyspark.sql.window import Window

w= Window.partitionBy("accountname", "clustername")

df.withColumn("count", F.count("clustername").over(w))\
    .filter(~((F.col("count")>1)&(F.col("namespace")=='infra')))\
    .drop("count").orderBy(F.col("accountname")).show()

+-----------+-----------+---------+----+
|accountname|clustername|namespace|cost|
+-----------+-----------+---------+----+
|   account1|cluster_1_1|   ns_1_1|  10|
|   account1|cluster_1_1|   ns_1_2|  11|
|   account1|cluster_1_2|    infra|  12|
|   account2|cluster_2_1|    infra|  13|
|   account3|cluster_3_1|   ns_3_1|  10|
|   account3|cluster_3_1|   ns_3_2|  11|
+-----------+-----------+---------+----+