Spark:如何将列的唯一值拆分为大小相等的存储桶并将它们映射回新列?
Spark: How can I split a column's unique values into equal size buckets and map them back to a new column?
我有一个包含 2 列的 table:一个 ID (uuid) 和一个值 (int)。我想添加一个第三列,它是一个组。我想将行分成 3 组,每组的大小均由唯一 ID 值决定。
例如,假设我有 99 个唯一 ID,总共 200 行。分配组后,最终可能会有一组分配给 33 行,另一组分配给 100 行,第三组分配给 67 行。但是,所有 3 个组都应具有相同数量的 (33) 个唯一 ID。
Example dataset:
id -> assigned group
---------------------
abc -> group a
def -> group b
ghi -> group c
jkl -> group a
mno -> group b
pqr -> group c
...
Original Table: Updated table:
id(uuid) | val id(uuid) | val | group
-----------+--------- --> ---------+-----+---------
abc | 1 abc | 1 | a
pqr | 1 pqr | 1 | c
abc | 2 abc | 3 | a
mno | 5 mno | 5 | b
def | 1 def | 1 | b
mno | 3 mno | 3 | b
def | 4 def | 4 | b
pqr | 3 pqr | 3 | c
ghi | 5 ghi | 5 | c
jkl | 1 jkl | 1 | a
mno | 4 mno | 4 | b
jkl | 6 jkl | 6 | a
def | 3 def | 3 | b
mno | 2 mno | 2 | b
...
Rows: 14
Num buckets: 3 [a, b, c]
Bucket a --> ids: 2, rows: 4
Bucket b --> ids: 2, rows: 7
Bucket c --> ids: 2, rows: 3
您可以分两步完成。
- 首先,提取所有唯一 ID,并将它们中的每一个关联到索引 K,范围从 1 到唯一 ID 的数量(或 0 到该数量减去 1)。然后,分配的组是索引模 3.
- 您将该结果加入原始数据框,您就得到了结果。
# Step 1
groups = df\
.select("id(uuid)")
.distinct()
.rdd.map(lambda x: x[0])
.zipWithIndex()
.mapValues(lambda x : x % 3)
.toDF(["id(uuid)", "group"])
groups.show()
产生:
+--------+-----+
|id(uuid)|group|
+--------+-----+
| pqr| 0|
| jkl| 1|
| ghi| 2|
| mno| 0|
| abc| 1|
| def| 2|
+--------+-----+
然后:
# Step 2
result = df.join(groups, 'id(uuid)')
注意,如果你想为你的组命名而不是整数,你可以简单地创建一个像这样的组映射:
group_map = [ (0, 'a'), (1, 'b'), (2, 'c') ]
group_map_df = spark.createDataFrame(group_map, ['group', 'new_group'])
new_result = result.join(group_map_df, ['group'])
我有一个包含 2 列的 table:一个 ID (uuid) 和一个值 (int)。我想添加一个第三列,它是一个组。我想将行分成 3 组,每组的大小均由唯一 ID 值决定。
例如,假设我有 99 个唯一 ID,总共 200 行。分配组后,最终可能会有一组分配给 33 行,另一组分配给 100 行,第三组分配给 67 行。但是,所有 3 个组都应具有相同数量的 (33) 个唯一 ID。
Example dataset:
id -> assigned group
---------------------
abc -> group a
def -> group b
ghi -> group c
jkl -> group a
mno -> group b
pqr -> group c
...
Original Table: Updated table:
id(uuid) | val id(uuid) | val | group
-----------+--------- --> ---------+-----+---------
abc | 1 abc | 1 | a
pqr | 1 pqr | 1 | c
abc | 2 abc | 3 | a
mno | 5 mno | 5 | b
def | 1 def | 1 | b
mno | 3 mno | 3 | b
def | 4 def | 4 | b
pqr | 3 pqr | 3 | c
ghi | 5 ghi | 5 | c
jkl | 1 jkl | 1 | a
mno | 4 mno | 4 | b
jkl | 6 jkl | 6 | a
def | 3 def | 3 | b
mno | 2 mno | 2 | b
...
Rows: 14
Num buckets: 3 [a, b, c]
Bucket a --> ids: 2, rows: 4
Bucket b --> ids: 2, rows: 7
Bucket c --> ids: 2, rows: 3
您可以分两步完成。
- 首先,提取所有唯一 ID,并将它们中的每一个关联到索引 K,范围从 1 到唯一 ID 的数量(或 0 到该数量减去 1)。然后,分配的组是索引模 3.
- 您将该结果加入原始数据框,您就得到了结果。
# Step 1
groups = df\
.select("id(uuid)")
.distinct()
.rdd.map(lambda x: x[0])
.zipWithIndex()
.mapValues(lambda x : x % 3)
.toDF(["id(uuid)", "group"])
groups.show()
产生:
+--------+-----+
|id(uuid)|group|
+--------+-----+
| pqr| 0|
| jkl| 1|
| ghi| 2|
| mno| 0|
| abc| 1|
| def| 2|
+--------+-----+
然后:
# Step 2
result = df.join(groups, 'id(uuid)')
注意,如果你想为你的组命名而不是整数,你可以简单地创建一个像这样的组映射:
group_map = [ (0, 'a'), (1, 'b'), (2, 'c') ]
group_map_df = spark.createDataFrame(group_map, ['group', 'new_group'])
new_result = result.join(group_map_df, ['group'])