PySpark 中的随机数生成

Random numbers generation in PySpark

让我们从一个总是 return 随机整数的简单函数开始:

import numpy as np

def f(x):
    return np.random.randint(1000)

和一个用零填充并使用 f:

映射的 RDD
rdd = sc.parallelize([0] * 10).map(f)

由于上面的 RDD 没有持久化,我希望每次收集时我都会得到不同的输出:

> rdd.collect()
[255, 512, 512, 512, 255, 512, 255, 512, 512, 255]

如果我们忽略这样一个事实,即值的分布看起来并不是随机的,它或多或少会发生这种情况。问题开始于我们只取第一个元素时:

assert len(set(rdd.first() for _ in xrange(100))) == 1

assert len(set(tuple(rdd.take(1)) for _ in xrange(100))) == 1

似乎每次都return相同的数字。我已经能够在两台装有 Spark 1.2、1.3 和 1.4 的不同机器上重现此行为。我在这里使用 np.random.randint 但它的行为方式与 random.randint.

相同

这个问题,与 collect 的非完全随机结果一样,似乎是 Python 特定的,我无法使用 Scala 重现它:

def f(x: Int) = scala.util.Random.nextInt(1000)

val rdd = sc.parallelize(List.fill(10)(0)).map(f)
(1 to 100).map(x => rdd.first).toSet.size

rdd.collect()

我是不是漏掉了什么明显的东西?

编辑:

原来问题的根源是 Python RNG 实现。引用 official documentation:

The functions supplied by this module are actually bound methods of a hidden instance of the random.Random class. You can instantiate your own instances of Random to get generators that don’t share state.

我假设 NumPy 以相同的方式工作并使用 RandomState 实例重写 f 如下

import os
import binascii

def f(x, seed=None):
    seed = (
        seed if seed is not None 
        else int(binascii.hexlify(os.urandom(4)), 16))
    rs = np.random.RandomState(seed)
    return rs.randint(1000)

变慢但解决了问题。

虽然上面解释的不是收集的随机结果,但我仍然不明白它如何影响多个操作之间的 first / take(1)

这似乎是 randint 的错误(或功能)。我看到了相同的行为,但是一旦我更改了 f,值确实发生了变化。所以,我不确定这种方法的实际随机性....我找不到任何文档,但它似乎使用了一些确定性数学算法,而不是使用 运行 机器的更多可变特征.即使我来回,返回原始值时数字似乎相同...

所以这里的实际问题比较简单。 Python 中的每个子进程都从其 parent:

继承其状态
len(set(sc.parallelize(range(4), 4).map(lambda _: random.getstate()).collect()))
# 1

由于 parent 状态在这种特定情况下没有理由改变并且工人的寿命有限,每个 child 的状态在每个 运行 上将完全相同。

对于我的用例,大部分解决方案都隐藏在问题底部的编辑中。然而,还有另一个复杂的问题:我想使用同一个函数来生成多个(不同的)随机列。事实证明,Spark 假设 UDF 的输出是确定性的,这意味着它可以跳过以后对具有相同输入的相同函数的调用。对于 return 随机输出的函数,这显然不是你想要的。

为了解决这个问题,我使用内置的 PySpark rand 函数为我想要的每个随机列生成了一个单独的种子列:

import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType
import numpy as np

@F.udf(IntegerType())
def my_rand(seed):
    rs = np.random.RandomState(seed)
    return rs.randint(1000)

seed_expr = (F.rand()*F.lit(4294967295).astype('double')).astype('bigint')
my_df = (
    my_df
    .withColumn('seed_0', seed_expr)
    .withColumn('seed_1', seed_expr)
    .withColumn('myrand_0', my_rand(F.col('seed_0')))
    .withColumn('myrand_1', my_rand(F.col('seed_1')))
    .drop('seed_0', 'seed_1')
)

我使用的是 DataFrame API 而不是原始问题的 RDD,因为这是我更熟悉的,但可能适用相同的概念。

注意:显然可以禁用自 v2.3 以来 Scala Spark UDF 的确定性假设:https://jira.apache.org/jira/browse/SPARK-20586.