从一列中查找最大值并根据最大值填充另一列
Finding the max value from a column and populating another column based on the max value
我在 csv 文件中有增量加载。我在数据框中读取了 csv。数据框有一列包含一些字符串。我必须从此列中找到不同的字符串,并在加入另一个数据帧后为从 0
开始的每个值分配一个 ID
(整数)。
在下一个 运行 中,我必须在找到 ID
列中的最大值并为不同的字符串递增后分配 ID。只要 ID
列中有 null,我就必须将它从前一个 运行.
的值递增 (+1)
第一个运行
string
ID
zero
0
first
1
second
2
third
3
fourth
4
第二个运行
MAX(ID) = 4
string
ID
zero
0
first
1
second
2
third
3
fourth
4
fifth
5
sixth
6
seventh
7
eighth
8
我已经尝试过了,但无法正常工作..
max = df.agg({"ID": "max"}).collect()[0][0]
df_incremented = df.withcolumn("ID", when(col("ID").isNull(),expr("max += 1")))
如果有实现此目的的简单方法,请告诉我。
因为您只保留不同的值,所以您可以在 window 上使用 row_number
函数:
from pyspark.sql import Window
from pyspark.sql import functions as F
df = spark.createDataFrame(
[("a",), ("a",), ("b",), ("c",), ("d",), ("e",), ("e",)],
("string",)
)
w = Window.orderBy("string")
df1 = df.distinct().withColumn("ID", F.row_number().over(w) - 1)
df1.show()
#+------+---+
#|string| ID|
#+------+---+
#| a| 0|
#| b| 1|
#| c| 2|
#| d| 3|
#| e| 4|
#+------+---+
现在让我们将一些行添加到此数据框中,并使用 row_number
和 coalesce
仅将 ID
分配给它为空的行(无需获取最大值):
df2 = df1.union(spark.sql("select * from values ('f', null), ('h', null), ('i', null)"))
df3 = df2.withColumn("ID", F.coalesce("ID", F.row_number(w) - 1))
df3.show()
#+------+---+
#|string| ID|
#+------+---+
#| a| 0|
#| b| 1|
#| c| 2|
#| d| 3|
#| e| 4|
#| f| 5|
#| h| 6|
#| i| 7|
#+------+---+
如果您也想保留重复的值并为它们分配相同的 ID
,请使用 dense_rank
而不是 row_number
。
我在 csv 文件中有增量加载。我在数据框中读取了 csv。数据框有一列包含一些字符串。我必须从此列中找到不同的字符串,并在加入另一个数据帧后为从 0
开始的每个值分配一个 ID
(整数)。
在下一个 运行 中,我必须在找到 ID
列中的最大值并为不同的字符串递增后分配 ID。只要 ID
列中有 null,我就必须将它从前一个 运行.
第一个运行
string | ID |
---|---|
zero | 0 |
first | 1 |
second | 2 |
third | 3 |
fourth | 4 |
第二个运行
MAX(ID) = 4
string | ID |
---|---|
zero | 0 |
first | 1 |
second | 2 |
third | 3 |
fourth | 4 |
fifth | 5 |
sixth | 6 |
seventh | 7 |
eighth | 8 |
我已经尝试过了,但无法正常工作..
max = df.agg({"ID": "max"}).collect()[0][0]
df_incremented = df.withcolumn("ID", when(col("ID").isNull(),expr("max += 1")))
如果有实现此目的的简单方法,请告诉我。
因为您只保留不同的值,所以您可以在 window 上使用 row_number
函数:
from pyspark.sql import Window
from pyspark.sql import functions as F
df = spark.createDataFrame(
[("a",), ("a",), ("b",), ("c",), ("d",), ("e",), ("e",)],
("string",)
)
w = Window.orderBy("string")
df1 = df.distinct().withColumn("ID", F.row_number().over(w) - 1)
df1.show()
#+------+---+
#|string| ID|
#+------+---+
#| a| 0|
#| b| 1|
#| c| 2|
#| d| 3|
#| e| 4|
#+------+---+
现在让我们将一些行添加到此数据框中,并使用 row_number
和 coalesce
仅将 ID
分配给它为空的行(无需获取最大值):
df2 = df1.union(spark.sql("select * from values ('f', null), ('h', null), ('i', null)"))
df3 = df2.withColumn("ID", F.coalesce("ID", F.row_number(w) - 1))
df3.show()
#+------+---+
#|string| ID|
#+------+---+
#| a| 0|
#| b| 1|
#| c| 2|
#| d| 3|
#| e| 4|
#| f| 5|
#| h| 6|
#| i| 7|
#+------+---+
如果您也想保留重复的值并为它们分配相同的 ID
,请使用 dense_rank
而不是 row_number
。