如何对包含 R 函数的 pyspark RDD 进行分区
How can I partition pyspark RDDs holding R functions
import rpy2.robjects as robjects
dffunc = sc.parallelize([(0,robjects.r.rnorm),(1,robjects.r.runif)])
dffunc.collect()
输出
[(0, <rpy2.rinterface.SexpClosure - Python:0x7f2ecfc28618 / R:0x26abd18>), (1, <rpy2.rinterface.SexpClosure - Python:0x7f2ecfc283d8 / R:0x26aad28>)]
分区版本报错时:
dffuncpart = dffunc.partitionBy(2)
dffuncpart.collect()
RuntimeError: ('R cannot evaluate code before being initialized.', <built-in function unserialize>
这个错误似乎是 R
没有加载到其中一个分区上,我认为这意味着没有执行第一个导入步骤。有没有办法解决?
编辑 1 第二个例子让我认为 pyspark 或 rpy2 的时间存在错误。
dffunc = sc.parallelize([(0,robjects.r.rnorm), (1,robjects.r.runif)]).partitionBy(2)
def loadmodel(model):
import rpy2.robjects as robjects
return model[1](2)
dffunc.map(loadmodel).collect()
产生相同的错误 R cannot evaluate code before being initialized。
dffuncpickle = sc.parallelize([(0,pickle.dumps(robjects.r.rnorm)),(1,pickle.dumps(robjects.r.runif))]).partitionBy(2)
def loadmodelpickle(model):
import rpy2.robjects as robjects
import pickle
return pickle.loads(model[1])(2)
dffuncpickle.map(loadmodelpickle).collect()
按预期工作。
我想说 "this is not a bug in rpy2, this is a feature" 但实际上我不得不接受 "this is a limitation"。
发生的事情是 rpy2 有 2 interface levels。一个是 low-level 一个(更接近 R 的 C API)并且可以通过 rpy2.rinterface
获得,另一个是 high-level 界面,有更多的花里胡哨,更多 "pythonic",并且对于从 rinterface
level-ones 继承的 R 对象使用 classes(最后一部分对于下面关于酸洗的部分很重要)。导入 high-level 接口会导致在必要时使用默认参数初始化(启动)嵌入式 R。导入 low-level 接口 rinterface
没有这种副作用,嵌入式 R 的初始化必须显式执行(函数 initr
)。 rpy2 是这样设计的,因为嵌入式 R 的初始化可以有参数:首先导入 rpy2.rinterface
,设置初始化,然后导入 rpy2.robjects
使这成为可能。
除此之外,由 rpy2 包装的 R 对象的序列化(pickling)目前仅定义在 rinterface
级别(参见 documentation)。酸洗 robjects
-level (high-level) rpy2 对象使用 rinterface
-level 代码,当对它们进行 unpickling 时,它们将保持在 lower-level(Python pickle包含对象的 class 定义的模块,并将导入该模块 - 这里 rinterface
,这并不意味着嵌入式 R 的初始化)。事情变成这样的原因很简单,就是 "good enough for now":在实现这个的时候,我不得不同时想出一个很好的方法来连接两种不同的语言,并通过 Python C-API 和 pickling/unpickling Python 个对象。鉴于可以轻松编写类似
的内容
import rpy2.robjects
或
import rpy2.rinterface
rpy2.rinterface.initr()
在 unpickling 之前,从未重新访问过。我所知道的 rpy2 酸洗的用途是使用 Python 的 multiprocessing
(并在初始化子进程的代码中添加类似于 import 语句的内容是一种便宜且足够的修复方法)。愿现在是时候再看看这个了。如果是的话,请为 rpy2 提交错误报告。
编辑:这无疑是rpy2的问题。腌制的 robjects
级别的对象应该退回 robjects
级别,而不是 rinterface
级别。我已经打开了一个 issue in the rpy2 tracker(并且已经在 default/dev 分支中推送了一个基本补丁)。
第二次编辑: 该补丁是从版本 2.7.7 开始发布的 rpy2 的一部分(撰写本文时的最新版本是 2.7.8)。
import rpy2.robjects as robjects
dffunc = sc.parallelize([(0,robjects.r.rnorm),(1,robjects.r.runif)])
dffunc.collect()
输出
[(0, <rpy2.rinterface.SexpClosure - Python:0x7f2ecfc28618 / R:0x26abd18>), (1, <rpy2.rinterface.SexpClosure - Python:0x7f2ecfc283d8 / R:0x26aad28>)]
分区版本报错时:
dffuncpart = dffunc.partitionBy(2)
dffuncpart.collect()
RuntimeError: ('R cannot evaluate code before being initialized.', <built-in function unserialize>
这个错误似乎是 R
没有加载到其中一个分区上,我认为这意味着没有执行第一个导入步骤。有没有办法解决?
编辑 1 第二个例子让我认为 pyspark 或 rpy2 的时间存在错误。
dffunc = sc.parallelize([(0,robjects.r.rnorm), (1,robjects.r.runif)]).partitionBy(2)
def loadmodel(model):
import rpy2.robjects as robjects
return model[1](2)
dffunc.map(loadmodel).collect()
产生相同的错误 R cannot evaluate code before being initialized。
dffuncpickle = sc.parallelize([(0,pickle.dumps(robjects.r.rnorm)),(1,pickle.dumps(robjects.r.runif))]).partitionBy(2)
def loadmodelpickle(model):
import rpy2.robjects as robjects
import pickle
return pickle.loads(model[1])(2)
dffuncpickle.map(loadmodelpickle).collect()
按预期工作。
我想说 "this is not a bug in rpy2, this is a feature" 但实际上我不得不接受 "this is a limitation"。
发生的事情是 rpy2 有 2 interface levels。一个是 low-level 一个(更接近 R 的 C API)并且可以通过 rpy2.rinterface
获得,另一个是 high-level 界面,有更多的花里胡哨,更多 "pythonic",并且对于从 rinterface
level-ones 继承的 R 对象使用 classes(最后一部分对于下面关于酸洗的部分很重要)。导入 high-level 接口会导致在必要时使用默认参数初始化(启动)嵌入式 R。导入 low-level 接口 rinterface
没有这种副作用,嵌入式 R 的初始化必须显式执行(函数 initr
)。 rpy2 是这样设计的,因为嵌入式 R 的初始化可以有参数:首先导入 rpy2.rinterface
,设置初始化,然后导入 rpy2.robjects
使这成为可能。
除此之外,由 rpy2 包装的 R 对象的序列化(pickling)目前仅定义在 rinterface
级别(参见 documentation)。酸洗 robjects
-level (high-level) rpy2 对象使用 rinterface
-level 代码,当对它们进行 unpickling 时,它们将保持在 lower-level(Python pickle包含对象的 class 定义的模块,并将导入该模块 - 这里 rinterface
,这并不意味着嵌入式 R 的初始化)。事情变成这样的原因很简单,就是 "good enough for now":在实现这个的时候,我不得不同时想出一个很好的方法来连接两种不同的语言,并通过 Python C-API 和 pickling/unpickling Python 个对象。鉴于可以轻松编写类似
import rpy2.robjects
或
import rpy2.rinterface
rpy2.rinterface.initr()
在 unpickling 之前,从未重新访问过。我所知道的 rpy2 酸洗的用途是使用 Python 的 multiprocessing
(并在初始化子进程的代码中添加类似于 import 语句的内容是一种便宜且足够的修复方法)。愿现在是时候再看看这个了。如果是的话,请为 rpy2 提交错误报告。
编辑:这无疑是rpy2的问题。腌制的 robjects
级别的对象应该退回 robjects
级别,而不是 rinterface
级别。我已经打开了一个 issue in the rpy2 tracker(并且已经在 default/dev 分支中推送了一个基本补丁)。
第二次编辑: 该补丁是从版本 2.7.7 开始发布的 rpy2 的一部分(撰写本文时的最新版本是 2.7.8)。