我可以在安装过程中将外部 (R) 进程连接到每个 pyspark worker

Can I connect an external (R) process to each pyspark worker during setup

我想让每个 python 工作人员使用 rpy2 启动 R shell。我可以在某种设置阶段执行此操作,类似于我假设在导入 Python 模块以用于以后的执行程序任务时会发生这种情况吗?例如:

import numpy as np

df.mapPartitions(lambda x: np.zeros(x))

在我的例子中,我想在每个执行程序上启动一个 R shell 并导入 R 库,它看起来像这样:

import rpy2.robjects as robjects
from  rpy2.robjects.packages import importr
rlibrary = importr('testrlibrary')

df.mapPartitions(lambda x: rlibrary.rfunc(x))

但我不希望这发生在对 mapPartitions 的调用中,因为那样它会在任务级别发生,而不是每个执行程序核心一次。该方法有效,看起来更像下面的示例,但对我没有用。

def model(partition):
    import rpy2.robjects as robjects
    from  rpy2.robjects.packages import importr
    rlibrary = importr('testrlibrary')
    rlibrary.rfunc(partition)

df.mapPartitions(model)

像这样的东西应该可以正常工作:

import rpy2.robjects as robjects
from  rpy2.robjects.packages import importr

def length_(s):
    stringi = importr("stringi")  
    return stringi.stri_length(s)[0]

sc.parallelize(["foo", "bar", "foobar"]).map(length_)

R object, which represents R interpreter, is a singleton 所以它只会被初始化一次并且 R 不会 re-import 已经附加库。多次调用 require 会产生一些开销,但与将数据传入和传出 R 的成本相比,它应该可以忽略不计。

如果您想要更复杂的东西,您可以创建自己的 singleton module or use Borg pattern 来处理导入,但这可能有点矫枉过正。

I assume this would happen when you import a python module to be used for later executor tasks

其实要看配置。默认情况下,Spark 在任务之间重用解释器,但可以修改此行为。

我提供了一些示例作为对 的回答。也许你会发现这些有用。