显示列的第一次出现

show first occurence(s) of a column

我想使用 pyspark 根据输入创建新的数据框,它打印出每个不同值列的第一次出现。 rownumber() 会工作还是 window()。不确定最好的方法还是 sparksql 是最好的。基本上第二个 table 是我希望输出的地方,它只打印出输入中第一次出现的值列。我只对第一次出现的“值”列感兴趣。如果一个值重复,只显示第一个看到的值。

+--------+--------+--------+
|   VALUE|   DAY  |  Color
+--------+--------+--------+
|20      |MON     |    BLUE|
|20      |TUES    |    BLUE|
|30      |WED     |    BLUE|
+--------+--------+--------+


+--------+--------+--------+
|   VALUE|   DAY  |  Color
+--------+--------+--------+
|20      |MON     |    BLUE|
|30      |WED     |    BLUE|
+--------+--------+--------+

在我看来,您希望 VALUE 删除重复项。如果是这样,请使用 dropDuplicates

 df.dropDuplicates(['VALUE']).show()


+-----+---+-----+
|VALUE|DAY|Color|
+-----+---+-----+
|   20|MON| BLUE|
|   30|WED| BLUE|
+-----+---+-----+

这是how to do it with a window。在这个例子中,他们以薪水为例。在您的情况下,我认为您将使用 'DAY' 作为 orderBy,使用 'Value' 作为 partitionBy。

from pyspark.sql import SparkSession,Row
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

data = [("James","Sales",3000),("Michael","Sales",4600),
      ("Robert","Sales",4100),("Maria","Finance",3000),
      ("Raman","Finance",3000),("Scott","Finance",3300),
      ("Jen","Finance",3900),("Jeff","Marketing",3000),
      ("Kumar","Marketing",2000)]

df = spark.createDataFrame(data,["Name","Department","Salary"])
df.show()
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number
w2 = Window.partitionBy("department").orderBy(col("salary"))
df.withColumn("row",row_number().over(w2)) \
  .filter(col("row") == 1).drop("row") \
  .show()
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|        James|     Sales|  3000|
|        Maria|   Finance|  3000|
|        Kumar| Marketing|  2000|
+-------------+----------+------+

是的,您需要开发一种排序日期的方法,但我认为您知道这是可能的,并且您选择了正确的工具。我总是喜欢警告人们,这使用 window 并且他们将所有数据吸取给 1 个执行者来完成工作。这不是特别有效。在小型数据集上,这可能是高效的。在较大的数据集上,可能需要很长时间才能完成。

这是我不使用 window 的方法。它可能会在大型数据集上表现更好,因为它可以使用更多的集群来完成工作。在您的情况下,您需要使用 'VALUE' 作为部门,使用 'Salary' 作为 'DATE'。

from pyspark.sql import SparkSession,Row
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

data = [("James","Sales",3000),("Michael","Sales",4600),
      ("Robert","Sales",4100),("Maria","Finance",3000),
      ("Raman","Finance",3000),("Scott","Finance",3300),
      ("Jen","Finance",3900),("Jeff","Marketing",3000),
      ("Kumar","Marketing",2000)]

df = spark.createDataFrame(data,["Name","Department","Salary"])
unGroupedDf = df.select( \
  df["Department"], \
  f.struct(*[\ # Make a struct with all the record elements.
    df["Salary"].alias("Salary"),\  #will be sorted on Salary first      
    df["Department"].alias("Dept"),\
    df["Name"].alias("Name")] )\
  .alias("record") )
unGroupedDf.groupBy("Department")\ #group
 .agg(f.collect_list("record")\  #Gather all the element in a group
  .alias("record"))\
  .select(\
    f.reverse(\ #Make the sort Descending
      f.array_sort(\ #Sort the array ascending
        f.col("record")\ #the struct
      )\
    )[0].alias("record"))\ #grab the "Max element in the array
    ).select( f.col("record.*") ).show() # use struct as Columns
  .show()
+---------+------+-------+
|     Dept|Salary|   Name|
+---------+------+-------+
|    Sales|  4600|Michael|
|  Finance|  3900|    Jen|
|Marketing|  3000|   Jeff|
+---------+------+-------+