pyspark:从数据集中 select n 个不同 ID 的最佳方法是什么

pyspark: what is the best way to select n distinct IDs from a dataset

pyspark中有一个DataFrame,数据如下:

    id manager score
    A     x     3
    A     y     1
    B     a     2
    B     b     5
    C     f     2
    D     f     6

我期望的是结果数据集中正好有 n 个 ID。

例如。如果我说需要 3 个 ID,那么生成的数据集将如下所示:

id manager score
 A   x      3
 A   y      1
 B   a      2
 B   b      5
 C   f      2

所以这个数据集有 5 行,但恰好有 3 个 ID。 如果我执行 df.limit(3) 它只会是前 3 个记录,因为 ID 重复,我将得到少于 3 个 ID,即在这种情况下只有 2 个。如何进行?谢谢

您可以使用 spark sql 查询来执行此操作。

只需更改子查询中的 limit 子句值以选择不同的数目 id

df=spark.createDataFrame([("A", "x", "3"), ("A", "y", "1"), ("B", "a", "2"), ("B", "b", "5"), ("C", "v", "2"), ("D", "f", "6")], ["id", "manager", "score"])

+---+-------+-----+
| id|manager|score|
+---+-------+-----+
|  A|      x|    3|
|  A|      y|    1|
|  B|      a|    2|
|  B|      b|    5|
|  C|      v|    2|
|  D|      f|    6|
+---+-------+-----+

df.createOrReplaceTempView("employee")

sql("""select * from employee e1 
     inner join (  
        select distinct id as uni_id from employee order by uni_id limit 3) e2 
     on e1.id=e2.uni_id""").drop("uni_id").show() 

+---+-------+-----+
| id|manager|score|
+---+-------+-----+
|  A|      x|    3|
|  A|      y|    1|
|  B|      a|    2|
|  B|      b|    5|
|  C|      v|    2|
+---+-------+-----+

您也可以尝试 StringIndexerid 列中的每个字符串分解,然后根据限制 filter

import pyspark.sql.functions as F
from pyspark.ml.feature import StringIndexer

n = 3 #change as per limit
idx = StringIndexer(inputCol="id",outputCol="id_num")
idx.fit(df).transform(df).filter(F.col("id_num")<n).drop("id_num").show()

+---+-------+-----+
| id|manager|score|
+---+-------+-----+
|  A|      x|    3|
|  A|      y|    1|
|  B|      a|    2|
|  B|      b|    5|
|  C|      f|    2|
+---+-------+-----+

这是一种使用 'colllect_set' 函数和一些 pythonic 操作的简单方法:

idLimit=3 #define your limit

id_lst=(sourceDF  #collect a list of distinct ids
        .select(collect_set('id'))
        .collect()[0][0]
       )

id_lst.sort() #sort the ids alphabatically

id_lst_limited=id_lst[:idLimit] #limit the list as per your defined limit

targetDF=(sourceDF #filter the source df using your limited list
          .filter("id in ({0})".format(str(id_lst_limited)[1:-1]))
         )

我注意到上面的答案之一是基于 Spark SQL。

这是另一种基于 Spark SQL 的方法,但带有 WINDOW 子句 -

sql("select id, manager, score from (select e1.id, e1.manager, e1.score, dense_rank() over (order by e1.id) as rrank from employee e1) where rrank <= 3").show()

完整代码-

df=spark.createDataFrame([("A", "x", "3"), ("A", "y", "1"), ("B", "a", "2"), ("B", "b", "5"), ("C", "v", "2"), ("D", "f", "6")], ["id", "manager", "score"])

+---+-------+-----+
| id|manager|score|
+---+-------+-----+
|  A|      x|    3|
|  A|      y|    1|
|  B|      a|    2|
|  B|      b|    5|
|  C|      v|    2|
|  D|      f|    6|
+---+-------+-----+

df.createOrReplaceTempView("employee")


sql("select id, manager, score from (select e1.id, e1.manager, e1.score, dense_rank() over (order by e1.id) as rrank from employee e1) where rrank <= 3").show()

+---+-------+-----+
| id|manager|score|
+---+-------+-----+
|  A|      x|    3|
|  A|      y|    1|
|  B|      a|    2|
|  B|      b|    5|
|  C|      v|    2|
+---+-------+-----+

您可以通过 where id in (select distinct id ...limit 3) 简单地避免 join,如下所示-

 val df = Seq(("A", "x", "3"), ("A", "y", "1"), ("B", "a", "2"), ("B", "b", "5"), ("C", "v", "2"), ("D", "f", "6"))
    .toDF("id", "manager", "score")
    df.show(false)

    /**
      * +---+-------+-----+
      * |id |manager|score|
      * +---+-------+-----+
      * |A  |x      |3    |
      * |A  |y      |1    |
      * |B  |a      |2    |
      * |B  |b      |5    |
      * |C  |v      |2    |
      * |D  |f      |6    |
      * +---+-------+-----+
      */

    df.createOrReplaceTempView("employee")
    spark.sql("select * from employee where id in (select distinct id from employee order by id limit 3)")
      .show(false)

    /**
      * +---+-------+-----+
      * |id |manager|score|
      * +---+-------+-----+
      * |A  |x      |3    |
      * |A  |y      |1    |
      * |B  |a      |2    |
      * |B  |b      |5    |
      * |C  |v      |2    |
      * +---+-------+-----+
      */