Spark:如何将列的唯一值拆分为大小相等的存储桶并将它们映射回新列?

Spark: How can I split a column's unique values into equal size buckets and map them back to a new column?

我有一个包含 2 列的 table:一个 ID (uuid) 和一个值 (int)。我想添加一个第三列,它是一个组。我想将行分成 3 组,每组的大小均由唯一 ID 值决定。

例如,假设我有 99 个唯一 ID,总共 200 行。分配组后,最终可能会有一组分配给 33 行,另一组分配给 100 行,第三组分配给 67 行。但是,所有 3 个组都应具有相同数量的 (33) 个唯一 ID。

Example dataset:

id  -> assigned group
---------------------
abc -> group a
def -> group b
ghi -> group c
jkl -> group a
mno -> group b
pqr -> group c
...

Original Table:                   Updated table:

id(uuid)   | val                  id(uuid) | val | group
-----------+---------     -->     ---------+-----+---------
abc        | 1                    abc      | 1   | a
pqr        | 1                    pqr      | 1   | c
abc        | 2                    abc      | 3   | a
mno        | 5                    mno      | 5   | b
def        | 1                    def      | 1   | b
mno        | 3                    mno      | 3   | b
def        | 4                    def      | 4   | b
pqr        | 3                    pqr      | 3   | c
ghi        | 5                    ghi      | 5   | c
jkl        | 1                    jkl      | 1   | a
mno        | 4                    mno      | 4   | b
jkl        | 6                    jkl      | 6   | a
def        | 3                    def      | 3   | b
mno        | 2                    mno      | 2   | b
...

Rows: 14
Num buckets: 3 [a, b, c]
Bucket a --> ids: 2, rows: 4
Bucket b --> ids: 2, rows: 7
Bucket c --> ids: 2, rows: 3

您可以分两步完成。

  1. 首先,提取所有唯一 ID,并将它们中的每一个关联到索引 K,范围从 1 到唯一 ID 的数量(或 0 到该数量减去 1)。然后,分配的组是索引模 3.
  2. 您将该结果加入原始数据框,您就得到了结果。
# Step 1
groups = df\
    .select("id(uuid)")
    .distinct()
    .rdd.map(lambda x: x[0])
    .zipWithIndex()
    .mapValues(lambda x : x % 3)
    .toDF(["id(uuid)", "group"])
groups.show()

产生:

+--------+-----+
|id(uuid)|group|
+--------+-----+
|     pqr|    0|
|     jkl|    1|
|     ghi|    2|
|     mno|    0|
|     abc|    1|
|     def|    2|
+--------+-----+

然后:

# Step 2
result = df.join(groups, 'id(uuid)')

注意,如果你想为你的组命名而不是整数,你可以简单地创建一个像这样的组映射:

group_map = [ (0, 'a'), (1, 'b'), (2, 'c') ]
group_map_df = spark.createDataFrame(group_map, ['group', 'new_group'])
new_result = result.join(group_map_df, ['group'])