如何对包含 R 函数的 pyspark RDD 进行分区

How can I partition pyspark RDDs holding R functions

import rpy2.robjects as robjects

dffunc = sc.parallelize([(0,robjects.r.rnorm),(1,robjects.r.runif)])
dffunc.collect() 

输出

[(0, <rpy2.rinterface.SexpClosure - Python:0x7f2ecfc28618 / R:0x26abd18>), (1, <rpy2.rinterface.SexpClosure - Python:0x7f2ecfc283d8 / R:0x26aad28>)]

分区版本报错时:

dffuncpart = dffunc.partitionBy(2)
dffuncpart.collect()

RuntimeError: ('R cannot evaluate code before being initialized.', <built-in function unserialize>

这个错误似乎是 R 没有加载到其中一个分区上,我认为这意味着没有执行第一个导入步骤。有没有办法解决?

编辑 1 第二个例子让我认为 pyspark 或 rpy2 的时间存在错误。

dffunc = sc.parallelize([(0,robjects.r.rnorm),     (1,robjects.r.runif)]).partitionBy(2)
def loadmodel(model):
    import rpy2.robjects as robjects
    return model[1](2)
dffunc.map(loadmodel).collect()

产生相同的错误 R cannot evaluate code before being initialized。

dffuncpickle = sc.parallelize([(0,pickle.dumps(robjects.r.rnorm)),(1,pickle.dumps(robjects.r.runif))]).partitionBy(2)
def loadmodelpickle(model):
    import rpy2.robjects as robjects
    import pickle
    return pickle.loads(model[1])(2)
dffuncpickle.map(loadmodelpickle).collect()

按预期工作。

我想说 "this is not a bug in rpy2, this is a feature" 但实际上我不得不接受 "this is a limitation"。

发生的事情是 rpy2 有 2 interface levels。一个是 low-level 一个(更接近 R 的 C API)并且可以通过 rpy2.rinterface 获得,另一个是 high-level 界面,有更多的花里胡哨,更多 "pythonic",并且对于从 rinterface level-ones 继承的 R 对象使用 classes(最后一部分对于下面关于酸洗的部分很重要)。导入 high-level 接口会导致在必要时使用默认参数初始化(启动)嵌入式 R。导入 low-level 接口 rinterface 没有这种副作用,嵌入式 R 的初始化必须显式执行(函数 initr)。 rpy2 是这样设计的,因为嵌入式 R 的初始化可以有参数:首先导入 rpy2.rinterface,设置初始化,然后导入 rpy2.robjects 使这成为可能。

除此之外,由 rpy2 包装的 R 对象的序列化(pickling)目前仅定义在 rinterface 级别(参见 documentation)。酸洗 robjects-level (high-level) rpy2 对象使用 rinterface-level 代码,当对它们进行 unpickling 时,它们将保持在 lower-level(Python pickle包含对象的 class 定义的模块,并将导入该模块 - 这里 rinterface,这并不意味着嵌入式 R 的初始化)。事情变成这样的原因很简单,就是 "good enough for now":在实现这个的时候,我不得不同时想出一个很好的方法来连接两种不同的语言,并通过 Python C-API 和 pickling/unpickling Python 个对象。鉴于可以轻松编写类似

的内容
import rpy2.robjects

import rpy2.rinterface
rpy2.rinterface.initr()

在 unpickling 之前,从未重新访问过。我所知道的 rpy2 酸洗的用途是使用 Python 的 multiprocessing(并在初始化子进程的代码中添加类似于 import 语句的内容是一种便宜且足够的修复方法)。愿现在是时候再看看这个了。如果是的话,请为 rpy2 提交错误报告。

编辑:这无疑是rpy2的问题。腌制的 robjects 级别的对象应该退回 robjects 级别,而不是 rinterface 级别。我已经打开了一个 issue in the rpy2 tracker(并且已经在 default/dev 分支中推送了一个基​​本补丁)。

第二次编辑: 该补丁是从版本 2.7.7 开始发布的 rpy2 的一部分(撰写本文时的最新版本是 2.7.8)。