Pyspark:从随机项目函数创建一个集合

Pyspark: Create a set from a random item function

pyspark 的新手,希望获得有关根据给定列表中的随机选择生成一组项目的任何指示。这些随机选择需要附加到一个列表,但必须是唯一的,所以在 python 实现中,我使用一个集合来启动,在 while 语句的上下文中

import string
import random
def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
   return ''.join(random.choice(chars) for _ in range(size))
my_set=set()
while len(my_set)<n+1:  #n being the number of items desired
  my_set.add(id_generator())

(id_generator 语法归功于 )

我想做的是利用 spark 的分布式计算,更快地完成上述任务。

在流程方面,我认为需要发生这样的事情:将集合保存在驱动程序节点上,并将函数分发给可用于执行 id_generator() 的工作人员,直到有 n 个唯一我的套装中的物品。在 pyspark 中似乎没有 random.choices 的等效函数,所以也许我需要使用 UDF 装饰器在 pyspark 中注册该函数?

这是针对 0,1 而非 从某个项目列表中随机选择的分布。 https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.rand.html

@udf
def id_generator():
  import string
  import random
  def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
    return ''.join(random.choice(chars) for _ in range(size))
  return id_generator()

类似于上面的内容?虽然我仍然不清楚 how/if 是否设置 spark 工作。

以上是正确的想法,虽然我不知道从单个项目 spark 数据帧收集值对于数百万次迭代是个好主意。

该代码在直接 python 上运行良好,但如果可能的话,我想加快几个小时的速度。 (我需要根据各种 rules/list 值生成几个随机生成的列,以从头开始创建数据集)。

*我知道 id_generator() 的大小为 6,有大约 2,176,782,336 种组合 http://mathcentral.uregina.ca/QQ/database/QQ.09.00/churilla1.html 所以重复的机会并不大,但即使没有 set() 要求,我我仍在为将随机选择从列表附加到 pyspark 中的另一个列表的最佳实现而苦苦挣扎。

感谢任何意见!

编辑 这看起来很有希望

如果 Spark 是最佳方式,这实际上取决于您的用例,但是您可以在生成的数据帧上使用函数的 udf 并删除重复项。这种方法的缺点是,由于删除了重复项,因此很难达到您可能需要的确切数据点数。

注意 1:我稍微调整了您的函数以使用 random.choices。

注意 2:如果 运行 在多个节点上,您可能需要确保每个节点使用不同的随机种子。

import string
import random
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

SIZE = 10 ** 6

spark = SparkSession.builder.getOrCreate()

@udf(StringType())
def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
    return ''.join(random.choices(chars, k=size))

df = spark.range(SIZE)

df = df.withColumn('sample', id_generator()).drop('id')

print(f'Count: {df.count()}')
print(f'Unique count: {df.dropDuplicates().count()}')

df.show(5)

给出:

Count: 1000000                                                                  
Unique count: 999783                                                            
+------+
|sample|
+------+
|QTOVIM|
|NEH0SY|
|DJW5Q3|
|WMEKRF|
|OQ09N9|
+------+
only showing top 5 rows