从一列中查找最大值并根据最大值填充另一列

Finding the max value from a column and populating another column based on the max value

我在 csv 文件中有增量加载。我在数据框中读取了 csv。数据框有一列包含一些字符串。我必须从此列中找到不同的字符串,并在加入另一个数据帧后为从 0 开始的每个值分配一个 ID (整数)。

在下一个 运行 中,我必须在找到 ID 列中的最大值并为不同的字符串递增后分配 ID。只要 ID 列中有 null,我就必须将它从前一个 运行.

的值递增 (+1)

第一个运行

string ID
zero 0
first 1
second 2
third 3
fourth 4

第二个运行

MAX(ID) = 4

string ID
zero 0
first 1
second 2
third 3
fourth 4
fifth 5
sixth 6
seventh 7
eighth 8

我已经尝试过了,但无法正常工作..

max = df.agg({"ID": "max"}).collect()[0][0]
df_incremented = df.withcolumn("ID", when(col("ID").isNull(),expr("max += 1")))

如果有实现此目的的简单方法,请告诉我。

因为您只保留不同的值,所以您可以在 window 上使用 row_number 函数:

from pyspark.sql import Window
from pyspark.sql import functions as F

 df = spark.createDataFrame(
    [("a",), ("a",), ("b",), ("c",), ("d",), ("e",), ("e",)],
    ("string",)
)

w = Window.orderBy("string")

df1 = df.distinct().withColumn("ID", F.row_number().over(w) - 1)

df1.show()
#+------+---+
#|string| ID|
#+------+---+
#|     a|  0|
#|     b|  1|
#|     c|  2|
#|     d|  3|
#|     e|  4|
#+------+---+

现在让我们将一些行添加到此数据框中,并使用 row_numbercoalesce 仅将 ID 分配给它为空的行(无需获取最大值):

df2 = df1.union(spark.sql("select * from values ('f', null), ('h', null), ('i', null)"))

df3 = df2.withColumn("ID", F.coalesce("ID", F.row_number(w) - 1))

df3.show()
#+------+---+
#|string| ID|
#+------+---+
#|     a|  0|
#|     b|  1|
#|     c|  2|
#|     d|  3|
#|     e|  4|
#|     f|  5|
#|     h|  6|
#|     i|  7|
#+------+---+

如果您也想保留重复的值并为它们分配相同的 ID,请使用 dense_rank 而不是 row_number