PySpark:范围内的随机数(基于列)

PySpark: random number from range (based on a column)

我试图生成一个列,每行都有一个随机数,但这个数字必须在现有列和 -1 之间的范围内。如果我有:

customer existing_value 
   A          -15
   B          -9
   C          -13

我想收到类似 rand(existing_value, -1):

customer existing_value random_value
   A          -15            -3
   B          -9             -8
   C          -13            -6

我找不到专门的 PySpark 解决方案,sql rand() 功能似乎太有限了。我尝试使用以下代码,但该函数不接受列作为输入:

random_month.withColumn('random', randint(col('existing_value'), -1))

什么是好的解决方案?行数约为 100k,因此如果 PySpark 中没有合适的内容,如有必要,可以选择 pandas。

您可以将 randint 函数与 UDF 一起使用:

from pyspark.sql import functions as F

df = spark.createDataFrame([("A", -15), ("B", -9), ("C", -13), ], ["customer", "existing_value"])

df1 = df.withColumn("random_value", F.udf(lambda x: randint(x, -1))("existing_value"))

df1.show()
#+--------+--------------+------------+
#|customer|existing_value|random_value|
#+--------+--------------+------------+
#|       A|           -15|          -5|
#|       B|            -9|          -7|
#|       C|           -13|          -3|
#+--------+--------------+------------+

另一种解决方案是生成从 existing_value-1 的数字序列,然后从结果数组中随机选择一个元素:

df1 = df.withColumn(
    "random_value",
    F.expr("sequence(existing_value, -1, 1)")
).withColumn(
    "random_value",
    F.col("random_value")[F.floor(F.rand() * F.size("random_value"))]
)

使用 pandas' 应用函数对列进行此类操作:https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.apply.html。 一种可能的解决方案如下:

import pandas as pd
import random

df = pd.DataFrame([-15, -20], columns=['existing_value'])
df['random_value'] = df.existing_value.apply(lambda row: random.randint(row,-1))
print(df)