运行 并行使用多处理的 rpy2 引发无法捕获的奇怪异常

Running rpy2 in parallel using multiprocessing raises weird exception that cannot be caught

所以这是一个我无法解决的问题,我也不知道制作 MCVE 的好方法。本质上,它已被简要讨论 here,但正如评论所示,存在一些分歧,最终裁决仍未出炉。因此,我再次发布类似的问题,希望得到更好的答案。


我有来自几千个传感器的传感器数据,我每分钟都会获得这些数据。我的兴趣在于预测数据。为此,我使用 ARIMA 系列预测模型。长话短说,在与我的研究小组的其他成员讨论后,我们决定使用 R 包 forecast 中可用的 Arima 函数,而不是相同的 statsmodels 实现。


因为我有来自几千个传感器的数据,为此我想至少分析一整周的数据(首先),并且由于一周有 7 天,所以我有 7 倍的数据我的传感器数据数量。本质上是一些 14k 传感器天组合。找到最佳 ARIMA 阶数(最小化 BIC)并预测下一周的数据对于每个传感器天组合大约需要 1 分钟。这意味着需要超过 11 天才能在单个核心上处理一周的数据!

这显然是一种浪费,因为我还有 15 个核心一直闲置着。所以,很明显,这是并行处理的问题。请注意,每个传感器日组合不会影响任何其他传感器日组合。此外,我的其余代码都经过了很好的分析和优化。



Exception in thread Thread-3:
Traceback (most recent call last):
  File "/home/kartik/miniconda3/lib/python3.5/threading.py", line 914, in _bootstrap_inner
  File "/home/kartik/miniconda3/lib/python3.5/threading.py", line 862, in run
    self._target(*self._args, **self._kwargs)
  File "/home/kartik/miniconda3/lib/python3.5/multiprocessing/pool.py", line 429, in _handle_results
    task = get()
  File "/home/kartik/miniconda3/lib/python3.5/multiprocessing/connection.py", line 251, in recv
    return ForkingPickler.loads(buf.getbuffer())
  File "/home/kartik/miniconda3/lib/python3.5/site-packages/rpy2/robjects/robject.py", line 55, in _reduce_robjectmixin
    rinterface_level=rinterface_factory(rdumps, rtypeof)
ValueError: Mismatch between the serialized object and the expected R type (expected 6 but got 24)


  1. rpy2包中提出
  2. 它与线程 3 有关。由于 Python 是零索引,我猜这是第四个线程。因此,4x6 = 24,加起来就是最终错误语句中显示的数字
  3. rpy2 在我的代码中仅在一个地方使用,它可能必须将返回值重新编码为 Python 类型。保护 try: ... except: ... 中的该行不会捕获该异常
  4. 当我放弃多处理并在循环中调用函数时没有引发异常
  5. 异常不会使程序崩溃,只是永远挂起它(直到我 Ctrl+C 终止它)
  6. 到目前为止我尝试的所有方法都无法解决错误


我尝试了一切,从极端的过程编码,用函数来处理最少的情况(即只有一个函数被并行调用),到极端的封装,其中可执行块在 if __name__ == '__main__':调用一个读取数据的函数,进行必要的分组,然后将组传递给另一个函数,该函数导入 multiprocessing 并并行调用另一个函数,该函数导入导入 rpy2 的处理模块,以及将数据传递给 R.

中的 Arima 函数

基本上,如果 rpy2 在函数嵌套的深处被调用和初始化,以至于它不知道另一个实例可能被初始化,或者它是否被全局调用和初始化一次,都没有关系,如果涉及 multiprocessing,则会引发错误。



import numpy as np
import pandas as pd

def arima_select(y, order):
    from rpy2 import robjects as ro
    from rpy2.robjects.packages import importr
    from rpy2.robjects import pandas2ri
    forecast = importr('forecast')

    res = forecast.Arima(y, order=ro.FloatVector(order))
    return res

def arima_wrapper(data):
    data = data[['tstamp', 'val']]
    data.set_index('tstamp', inplace=True)
    return arima_select(data, (1,1,1))

def applyParallel(groups, func):
    from multiprocessing import Pool, cpu_count
    with Pool(cpu_count()) as p:
        ret_list = p.map(func, [group for _, group in groups])
    return pd.concat(ret_list, keys=[name for name, _ in groups])

def wrapper():
    df = pd.read_csv('file.csv', parse_dates=[1], infer_datetime_format=True)
    df['day'] = df['tstamp'].dt.day
    res = applyParallel(df.groupby(['sensor', 'day']), arima_wrapper)



这是上面伪代码中arima_wrapperdata.set_index('tstamp', inplace=True)的紧下方print(data.head(6))的输出:


def data_gen(start_day):
    r = pd.Series(pd.date_range('2016-09-{}'.format(str(start_day)),
                                periods=24*60, freq='T'),
    d = pd.Series(np.random.randint(10, 80, 1440), name='val')
    s = pd.Series(['sensor1']*1440, name='sensor')
    return pd.concat([s, r, d], axis=1)
df = pd.concat([data_gen(day) for day in range(1,8)], ignore_index=True)


第一个观察结果是这个错误仅在涉及 multiprocessing 时出现,而不是在循环中调用函数 (arima_wrapper) 时出现。因此,它必须以某种方式与多处理问题相关联。 R 对多进程不是很友好,但是当以伪代码所示的方式编写时,R 的每个实例不应该知道其他实例的存在。

根据伪代码的结构方式,multiprocessing 产生的多个子进程中的每个调用都必须初始化 rpy2。如果那是真的,rpy2 的每个实例都应该产生自己的 R 实例,它应该只执行一个函数,然后终止。这不会引发任何错误,因为它类似于单线程操作。我的理解是否准确,或者我完全或部分地忽略了这一点?

如果 rpy2 的所有实例都以某种方式共享 R 的一个实例,那么我可能会合理地预料到该错误。什么是真的:R 是在 rpy2 的所有实例之间共享,还是每个 rpy2 实例都有一个 R 实例?


因为 SO 讨厌包含多个问题的问题线程,所以我会优先考虑我的问题,以便接受部分答案。这是我的优先列表:

  1. 如何解决这个问题?没有提出问题的工作代码示例将被接受为答案,即使它没有回答任何其他问题,前提是没有其他答案做得更好,或者更早发布。
  2. 我对 Python 导入的理解是否准确,或者我是否遗漏了有关 R 的多个实例的要点?如果我错了,我应该如何编辑导入语句以便在每个子流程中创建一个新实例?这个问题的答案很可能会给我指明一个可能的解决方案,并且会被接受,前提是没有更好的答案,或者更早发布
  3. R 是在 rpy2 的所有实例之间共享还是每个 rpy2 实例都有一个 R 实例?此问题的答案只有在解决问题时才会被接受。

Python packages/modules 是 "uniquely" 在您的流程中导入的,这意味着流程中使用 package/module 的所有代码都使用相同的单一导入(您不在给定的块中每个 import 都有一个副本)。

因此,我建议在创建池时使用初始化函数,而不是在每次将任务发送给工作人员时重复导入 rpy2 和设置转换。如果每个任务都很短,您也可能会提高性能。

def arima_select(y, order):
    # FIXME: check whether the rpy2.robjects package
    #        should be (re) imported as ro to be visible          
    res = forecast.Arima(y, order=ro.FloatVector(order))
    return res

forecast = None

def worker_init():
    from rpy2 import robjects as ro
    from rpy2.robjects.packages import importr
    from rpy2.robjects import pandas2ri
    global forecast
    forecast = importr('forecast')

def applyParallel(groups, func):
    from multiprocessing import Pool, cpu_count
    with Pool(cpu_count(), worker_init) as p:
        ret_list = p.map(func, [group for _, group in groups])
    return pd.concat(ret_list, keys=[name for name, _ in groups])
  1. Is R shared among all instances of rpy2 or is there an instance of R for each instance of rpy2? Answers to this question will be accepted only if they lead to a resolution of the problem.

rpy2 通过链接其 C 共享库使 R 可用。一个这样的库 per Python process,这是一个有状态的库(R 无法处理并发)。我认为您的问题与对象序列化(参见 http://rpy2.readthedocs.io/en/version_2.8.x/robjects_serialization.html#object-serialization)有关,而不是与并发有关。

在 Python 腌制 rpy2 对象后重建 R 对象时,发生了一些明显的混乱。更具体地说,当查看错误消息中提到的 R 对象类型时:

>>> from rpy2.rinterface import str_typeint
>>> str_typeint(6)
>>> str_typeint(24)

我猜测 return 由 forecast.Arima 编辑的 R 对象包含一个未计算的 R 表达式(例如导致该结果对象的调用)并且在序列化和反序列化时它会返回作为不同的东西(字节的原始向量)。这可能是 R 自己的序列化机制的错误(因为 rpy2 在后台使用它)。现在,为了解决您的问题,您可能想要从工作人员的函数调用 运行 中提取您最关心的 forecast.Arima 并且仅 return 的内容。

对问题工作中提供的伪代码中的 arima_select 函数进行了以下更改:

import numpy as np
import pandas as pd
from rpy2 import rinterface as ri


def arima_select(y, order):

    def rimport(packname):
        as_environment = ri.baseenv['as.environment']
        require = ri.baseenv['require']
                quiet = ri.BoolSexpVector((True, )))
        packname = ri.StrSexpVector(['package:' + str(packname)])
        pack_env = as_environment(packname)
        return pack_env

    frcst = rimport("forecast")
    args = (('y', ri.FloatSexpVector(y)),
            ('order', ri.FloatSexpVector(order)),
            ('include.constant', ri.StrSexpVector(const)))
    return frcst['Arima'].rcall(args, ri.globalenv)


import numpy as np
import pandas as pd
from rpy2 import rinterface as ri


def arima(y, order=(1,1,1)):
    # This is the same as arima_select above, just renamed to arima

def applyParallel(groups, func):
    from multiprocessing import Pool, cpu_count
    with Pool(cpu_count(), worker_init) as p:
        ret_list = p.map(func, [group for _, group in groups])
    return pd.concat(ret_list, keys=[name for name, _ in groups])

def main():
    # Create your df in your favorite way:
    def data_gen(start_day):
        r = pd.Series(pd.date_range('2016-09-{}'.format(str(start_day)),
                                    periods=24*60, freq='T'),
        d = pd.Series(np.random.randint(10, 80, 1440), name='val')
        s = pd.Series(['sensor1']*1440, name='sensor')
        return pd.concat([s, r, d], axis=1)
    df = pd.concat([data_gen(day) for day in range(1,8)], ignore_index=True)

    applyParallel(df.groupby(['sensor', pd.Grouper(key='tstamp', freq='D')]),
                  arima) # Note one may use partial from functools to pass order to arima

请注意,我也不直接从 applyParallel 调用 arima,因为我的目标是为给定系列(传感器和日期)找到最佳模型。我使用函数 arima_wrapper 遍历订单组合,并在每次迭代时调用 arima