我可以在安装过程中将外部 (R) 进程连接到每个 pyspark worker
Can I connect an external (R) process to each pyspark worker during setup
我想让每个 python 工作人员使用 rpy2 启动 R shell。我可以在某种设置阶段执行此操作,类似于我假设在导入 Python 模块以用于以后的执行程序任务时会发生这种情况吗?例如:
import numpy as np
df.mapPartitions(lambda x: np.zeros(x))
在我的例子中,我想在每个执行程序上启动一个 R shell 并导入 R 库,它看起来像这样:
import rpy2.robjects as robjects
from rpy2.robjects.packages import importr
rlibrary = importr('testrlibrary')
df.mapPartitions(lambda x: rlibrary.rfunc(x))
但我不希望这发生在对 mapPartitions
的调用中,因为那样它会在任务级别发生,而不是每个执行程序核心一次。该方法有效,看起来更像下面的示例,但对我没有用。
def model(partition):
import rpy2.robjects as robjects
from rpy2.robjects.packages import importr
rlibrary = importr('testrlibrary')
rlibrary.rfunc(partition)
df.mapPartitions(model)
像这样的东西应该可以正常工作:
import rpy2.robjects as robjects
from rpy2.robjects.packages import importr
def length_(s):
stringi = importr("stringi")
return stringi.stri_length(s)[0]
sc.parallelize(["foo", "bar", "foobar"]).map(length_)
R
object, which represents R interpreter, is a singleton 所以它只会被初始化一次并且 R 不会 re-import 已经附加库。多次调用 require
会产生一些开销,但与将数据传入和传出 R 的成本相比,它应该可以忽略不计。
如果您想要更复杂的东西,您可以创建自己的 singleton module or use Borg pattern 来处理导入,但这可能有点矫枉过正。
I assume this would happen when you import a python module to be used for later executor tasks
其实要看配置。默认情况下,Spark 在任务之间重用解释器,但可以修改此行为。
我提供了一些示例作为对 的回答。也许你会发现这些有用。
我想让每个 python 工作人员使用 rpy2 启动 R shell。我可以在某种设置阶段执行此操作,类似于我假设在导入 Python 模块以用于以后的执行程序任务时会发生这种情况吗?例如:
import numpy as np
df.mapPartitions(lambda x: np.zeros(x))
在我的例子中,我想在每个执行程序上启动一个 R shell 并导入 R 库,它看起来像这样:
import rpy2.robjects as robjects
from rpy2.robjects.packages import importr
rlibrary = importr('testrlibrary')
df.mapPartitions(lambda x: rlibrary.rfunc(x))
但我不希望这发生在对 mapPartitions
的调用中,因为那样它会在任务级别发生,而不是每个执行程序核心一次。该方法有效,看起来更像下面的示例,但对我没有用。
def model(partition):
import rpy2.robjects as robjects
from rpy2.robjects.packages import importr
rlibrary = importr('testrlibrary')
rlibrary.rfunc(partition)
df.mapPartitions(model)
像这样的东西应该可以正常工作:
import rpy2.robjects as robjects
from rpy2.robjects.packages import importr
def length_(s):
stringi = importr("stringi")
return stringi.stri_length(s)[0]
sc.parallelize(["foo", "bar", "foobar"]).map(length_)
R
object, which represents R interpreter, is a singleton 所以它只会被初始化一次并且 R 不会 re-import 已经附加库。多次调用 require
会产生一些开销,但与将数据传入和传出 R 的成本相比,它应该可以忽略不计。
如果您想要更复杂的东西,您可以创建自己的 singleton module or use Borg pattern 来处理导入,但这可能有点矫枉过正。
I assume this would happen when you import a python module to be used for later executor tasks
其实要看配置。默认情况下,Spark 在任务之间重用解释器,但可以修改此行为。
我提供了一些示例作为对