PySpark 中的随机数生成
Random numbers generation in PySpark
让我们从一个总是 return 随机整数的简单函数开始:
import numpy as np
def f(x):
return np.random.randint(1000)
和一个用零填充并使用 f
:
映射的 RDD
rdd = sc.parallelize([0] * 10).map(f)
由于上面的 RDD 没有持久化,我希望每次收集时我都会得到不同的输出:
> rdd.collect()
[255, 512, 512, 512, 255, 512, 255, 512, 512, 255]
如果我们忽略这样一个事实,即值的分布看起来并不是随机的,它或多或少会发生这种情况。问题开始于我们只取第一个元素时:
assert len(set(rdd.first() for _ in xrange(100))) == 1
或
assert len(set(tuple(rdd.take(1)) for _ in xrange(100))) == 1
似乎每次都return相同的数字。我已经能够在两台装有 Spark 1.2、1.3 和 1.4 的不同机器上重现此行为。我在这里使用 np.random.randint
但它的行为方式与 random.randint
.
相同
这个问题,与 collect
的非完全随机结果一样,似乎是 Python 特定的,我无法使用 Scala 重现它:
def f(x: Int) = scala.util.Random.nextInt(1000)
val rdd = sc.parallelize(List.fill(10)(0)).map(f)
(1 to 100).map(x => rdd.first).toSet.size
rdd.collect()
我是不是漏掉了什么明显的东西?
编辑:
原来问题的根源是 Python RNG 实现。引用 official documentation:
The functions supplied by this module are actually bound methods of a hidden instance of the random.Random class. You can instantiate your own instances of Random to get generators that don’t share state.
我假设 NumPy 以相同的方式工作并使用 RandomState
实例重写 f
如下
import os
import binascii
def f(x, seed=None):
seed = (
seed if seed is not None
else int(binascii.hexlify(os.urandom(4)), 16))
rs = np.random.RandomState(seed)
return rs.randint(1000)
变慢但解决了问题。
虽然上面解释的不是收集的随机结果,但我仍然不明白它如何影响多个操作之间的 first
/ take(1)
。
这似乎是 randint
的错误(或功能)。我看到了相同的行为,但是一旦我更改了 f
,值确实发生了变化。所以,我不确定这种方法的实际随机性....我找不到任何文档,但它似乎使用了一些确定性数学算法,而不是使用 运行 机器的更多可变特征.即使我来回,返回原始值时数字似乎相同...
所以这里的实际问题比较简单。 Python 中的每个子进程都从其 parent:
继承其状态
len(set(sc.parallelize(range(4), 4).map(lambda _: random.getstate()).collect()))
# 1
由于 parent 状态在这种特定情况下没有理由改变并且工人的寿命有限,每个 child 的状态在每个 运行 上将完全相同。
对于我的用例,大部分解决方案都隐藏在问题底部的编辑中。然而,还有另一个复杂的问题:我想使用同一个函数来生成多个(不同的)随机列。事实证明,Spark 假设 UDF 的输出是确定性的,这意味着它可以跳过以后对具有相同输入的相同函数的调用。对于 return 随机输出的函数,这显然不是你想要的。
为了解决这个问题,我使用内置的 PySpark rand
函数为我想要的每个随机列生成了一个单独的种子列:
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType
import numpy as np
@F.udf(IntegerType())
def my_rand(seed):
rs = np.random.RandomState(seed)
return rs.randint(1000)
seed_expr = (F.rand()*F.lit(4294967295).astype('double')).astype('bigint')
my_df = (
my_df
.withColumn('seed_0', seed_expr)
.withColumn('seed_1', seed_expr)
.withColumn('myrand_0', my_rand(F.col('seed_0')))
.withColumn('myrand_1', my_rand(F.col('seed_1')))
.drop('seed_0', 'seed_1')
)
我使用的是 DataFrame API 而不是原始问题的 RDD,因为这是我更熟悉的,但可能适用相同的概念。
注意:显然可以禁用自 v2.3 以来 Scala Spark UDF 的确定性假设:https://jira.apache.org/jira/browse/SPARK-20586.
让我们从一个总是 return 随机整数的简单函数开始:
import numpy as np
def f(x):
return np.random.randint(1000)
和一个用零填充并使用 f
:
rdd = sc.parallelize([0] * 10).map(f)
由于上面的 RDD 没有持久化,我希望每次收集时我都会得到不同的输出:
> rdd.collect()
[255, 512, 512, 512, 255, 512, 255, 512, 512, 255]
如果我们忽略这样一个事实,即值的分布看起来并不是随机的,它或多或少会发生这种情况。问题开始于我们只取第一个元素时:
assert len(set(rdd.first() for _ in xrange(100))) == 1
或
assert len(set(tuple(rdd.take(1)) for _ in xrange(100))) == 1
似乎每次都return相同的数字。我已经能够在两台装有 Spark 1.2、1.3 和 1.4 的不同机器上重现此行为。我在这里使用 np.random.randint
但它的行为方式与 random.randint
.
这个问题,与 collect
的非完全随机结果一样,似乎是 Python 特定的,我无法使用 Scala 重现它:
def f(x: Int) = scala.util.Random.nextInt(1000)
val rdd = sc.parallelize(List.fill(10)(0)).map(f)
(1 to 100).map(x => rdd.first).toSet.size
rdd.collect()
我是不是漏掉了什么明显的东西?
编辑:
原来问题的根源是 Python RNG 实现。引用 official documentation:
The functions supplied by this module are actually bound methods of a hidden instance of the random.Random class. You can instantiate your own instances of Random to get generators that don’t share state.
我假设 NumPy 以相同的方式工作并使用 RandomState
实例重写 f
如下
import os
import binascii
def f(x, seed=None):
seed = (
seed if seed is not None
else int(binascii.hexlify(os.urandom(4)), 16))
rs = np.random.RandomState(seed)
return rs.randint(1000)
变慢但解决了问题。
虽然上面解释的不是收集的随机结果,但我仍然不明白它如何影响多个操作之间的 first
/ take(1)
。
这似乎是 randint
的错误(或功能)。我看到了相同的行为,但是一旦我更改了 f
,值确实发生了变化。所以,我不确定这种方法的实际随机性....我找不到任何文档,但它似乎使用了一些确定性数学算法,而不是使用 运行 机器的更多可变特征.即使我来回,返回原始值时数字似乎相同...
所以这里的实际问题比较简单。 Python 中的每个子进程都从其 parent:
继承其状态len(set(sc.parallelize(range(4), 4).map(lambda _: random.getstate()).collect()))
# 1
由于 parent 状态在这种特定情况下没有理由改变并且工人的寿命有限,每个 child 的状态在每个 运行 上将完全相同。
对于我的用例,大部分解决方案都隐藏在问题底部的编辑中。然而,还有另一个复杂的问题:我想使用同一个函数来生成多个(不同的)随机列。事实证明,Spark 假设 UDF 的输出是确定性的,这意味着它可以跳过以后对具有相同输入的相同函数的调用。对于 return 随机输出的函数,这显然不是你想要的。
为了解决这个问题,我使用内置的 PySpark rand
函数为我想要的每个随机列生成了一个单独的种子列:
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType
import numpy as np
@F.udf(IntegerType())
def my_rand(seed):
rs = np.random.RandomState(seed)
return rs.randint(1000)
seed_expr = (F.rand()*F.lit(4294967295).astype('double')).astype('bigint')
my_df = (
my_df
.withColumn('seed_0', seed_expr)
.withColumn('seed_1', seed_expr)
.withColumn('myrand_0', my_rand(F.col('seed_0')))
.withColumn('myrand_1', my_rand(F.col('seed_1')))
.drop('seed_0', 'seed_1')
)
我使用的是 DataFrame API 而不是原始问题的 RDD,因为这是我更熟悉的,但可能适用相同的概念。
注意:显然可以禁用自 v2.3 以来 Scala Spark UDF 的确定性假设:https://jira.apache.org/jira/browse/SPARK-20586.