运行 并行使用多处理的 rpy2 引发无法捕获的奇怪异常

Running rpy2 in parallel using multiprocessing raises weird exception that cannot be caught

所以这是一个我无法解决的问题,我也不知道制作 MCVE 的好方法。本质上,它已被简要讨论 here,但正如评论所示,存在一些分歧,最终裁决仍未出炉。因此,我再次发布类似的问题,希望得到更好的答案。

背景

我有来自几千个传感器的传感器数据,我每分钟都会获得这些数据。我的兴趣在于预测数据。为此,我使用 ARIMA 系列预测模型。长话短说,在与我的研究小组的其他成员讨论后,我们决定使用 R 包 forecast 中可用的 Arima 函数,而不是相同的 statsmodels 实现。

问题定义

因为我有来自几千个传感器的数据,为此我想至少分析一整周的数据(首先),并且由于一周有 7 天,所以我有 7 倍的数据我的传感器数据数量。本质上是一些 14k 传感器天组合。找到最佳 ARIMA 阶数(最小化 BIC)并预测下一周的数据对于每个传感器天组合大约需要 1 分钟。这意味着需要超过 11 天才能在单个核心上处理一周的数据!

这显然是一种浪费,因为我还有 15 个核心一直闲置着。所以,很明显,这是并行处理的问题。请注意,每个传感器日组合不会影响任何其他传感器日组合。此外,我的其余代码都经过了很好的分析和优化。

问题

问题是我收到了一个我无法在任何地方捕捉到的奇怪错误。这是重现的错误:

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/home/kartik/miniconda3/lib/python3.5/threading.py", line 914, in _bootstrap_inner
    self.run()
  File "/home/kartik/miniconda3/lib/python3.5/threading.py", line 862, in run
    self._target(*self._args, **self._kwargs)
  File "/home/kartik/miniconda3/lib/python3.5/multiprocessing/pool.py", line 429, in _handle_results
    task = get()
  File "/home/kartik/miniconda3/lib/python3.5/multiprocessing/connection.py", line 251, in recv
    return ForkingPickler.loads(buf.getbuffer())
  File "/home/kartik/miniconda3/lib/python3.5/site-packages/rpy2/robjects/robject.py", line 55, in _reduce_robjectmixin
    rinterface_level=rinterface_factory(rdumps, rtypeof)
ValueError: Mismatch between the serialized object and the expected R type (expected 6 but got 24)

以下是我发现的此错误的一些特征:

  1. rpy2包中提出
  2. 它与线程 3 有关。由于 Python 是零索引,我猜这是第四个线程。因此,4x6 = 24,加起来就是最终错误语句中显示的数字
  3. rpy2 在我的代码中仅在一个地方使用,它可能必须将返回值重新编码为 Python 类型。保护 try: ... except: ... 中的该行不会捕获该异常
  4. 当我放弃多处理并在循环中调用函数时没有引发异常
  5. 异常不会使程序崩溃,只是永远挂起它(直到我 Ctrl+C 终止它)
  6. 到目前为止我尝试的所有方法都无法解决错误

尝试过的东西

我尝试了一切,从极端的过程编码,用函数来处理最少的情况(即只有一个函数被并行调用),到极端的封装,其中可执行块在 if __name__ == '__main__':调用一个读取数据的函数,进行必要的分组,然后将组传递给另一个函数,该函数导入 multiprocessing 并并行调用另一个函数,该函数导入导入 rpy2 的处理模块,以及将数据传递给 R.

中的 Arima 函数

基本上,如果 rpy2 在函数嵌套的深处被调用和初始化,以至于它不知道另一个实例可能被初始化,或者它是否被全局调用和初始化一次,都没有关系,如果涉及 multiprocessing,则会引发错误。

伪代码

这里尝试至少提供一些基本的伪代码,以便可以重现错误。

import numpy as np
import pandas as pd

def arima_select(y, order):
    from rpy2 import robjects as ro
    from rpy2.robjects.packages import importr
    from rpy2.robjects import pandas2ri
    pandas2ri.activate()
    forecast = importr('forecast')

    res = forecast.Arima(y, order=ro.FloatVector(order))
    return res

def arima_wrapper(data):
    data = data[['tstamp', 'val']]
    data.set_index('tstamp', inplace=True)
    return arima_select(data, (1,1,1))

def applyParallel(groups, func):
    from multiprocessing import Pool, cpu_count
    with Pool(cpu_count()) as p:
        ret_list = p.map(func, [group for _, group in groups])
    return pd.concat(ret_list, keys=[name for name, _ in groups])

def wrapper():
    df = pd.read_csv('file.csv', parse_dates=[1], infer_datetime_format=True)
    df['day'] = df['tstamp'].dt.day
    res = applyParallel(df.groupby(['sensor', 'day']), arima_wrapper)
    print(res)

显然,上面的代码可以进一步封装,但我认为它应该能相当准确地重现错误。

数据样本

这是上面伪代码中arima_wrapperdata.set_index('tstamp', inplace=True)的紧下方print(data.head(6))的输出:

或者,传感器整周的数据可以简单地通过以下方式生成:

def data_gen(start_day):
    r = pd.Series(pd.date_range('2016-09-{}'.format(str(start_day)),
                                periods=24*60, freq='T'),
                  name='tstamp')
    d = pd.Series(np.random.randint(10, 80, 1440), name='val')
    s = pd.Series(['sensor1']*1440, name='sensor')
    return pd.concat([s, r, d], axis=1)
df = pd.concat([data_gen(day) for day in range(1,8)], ignore_index=True)

观察和问题

第一个观察结果是这个错误仅在涉及 multiprocessing 时出现,而不是在循环中调用函数 (arima_wrapper) 时出现。因此,它必须以某种方式与多处理问题相关联。 R 对多进程不是很友好,但是当以伪代码所示的方式编写时,R 的每个实例不应该知道其他实例的存在。

根据伪代码的结构方式,multiprocessing 产生的多个子进程中的每个调用都必须初始化 rpy2。如果那是真的,rpy2 的每个实例都应该产生自己的 R 实例,它应该只执行一个函数,然后终止。这不会引发任何错误,因为它类似于单线程操作。我的理解是否准确,或者我完全或部分地忽略了这一点?

如果 rpy2 的所有实例都以某种方式共享 R 的一个实例,那么我可能会合理地预料到该错误。什么是真的:R 是在 rpy2 的所有实例之间共享,还是每个 rpy2 实例都有一个 R 实例?

如何解决这个问题?

因为 SO 讨厌包含多个问题的问题线程,所以我会优先考虑我的问题,以便接受部分答案。这是我的优先列表:

  1. 如何解决这个问题?没有提出问题的工作代码示例将被接受为答案,即使它没有回答任何其他问题,前提是没有其他答案做得更好,或者更早发布。
  2. 我对 Python 导入的理解是否准确,或者我是否遗漏了有关 R 的多个实例的要点?如果我错了,我应该如何编辑导入语句以便在每个子流程中创建一个新实例?这个问题的答案很可能会给我指明一个可能的解决方案,并且会被接受,前提是没有更好的答案,或者更早发布
  3. R 是在 rpy2 的所有实例之间共享还是每个 rpy2 实例都有一个 R 实例?此问题的答案只有在解决问题时才会被接受。

(...) Long story short (...)

真的吗?

  1. How might this issue be overcome? A working code example that does not raise the issue will be accepted as answer even if it does not answer any other question, provided no other answer does better, or was posted earlier.

答案可能会给您留下相当多的工作...

  1. Is my understanding of Python imports accurate, or am I missing the point about multiple instances of R? If I am wrong, how should I edit the import statements such that a new instance is created within each subprocess? Answers to this question are likely to point me towards a probable solution, and will be accepted, provided no answer does better, or was posted earlier

Python packages/modules 是 "uniquely" 在您的流程中导入的,这意味着流程中使用 package/module 的所有代码都使用相同的单一导入(您不在给定的块中每个 import 都有一个副本)。

因此,我建议在创建池时使用初始化函数,而不是在每次将任务发送给工作人员时重复导入 rpy2 和设置转换。如果每个任务都很短,您也可能会提高性能。

def arima_select(y, order):
    # FIXME: check whether the rpy2.robjects package
    #        should be (re) imported as ro to be visible          
    res = forecast.Arima(y, order=ro.FloatVector(order))
    return res

forecast = None

def worker_init():
    from rpy2 import robjects as ro
    from rpy2.robjects.packages import importr
    from rpy2.robjects import pandas2ri
    pandas2ri.activate()
    global forecast
    forecast = importr('forecast')

def applyParallel(groups, func):
    from multiprocessing import Pool, cpu_count
    with Pool(cpu_count(), worker_init) as p:
        ret_list = p.map(func, [group for _, group in groups])
    return pd.concat(ret_list, keys=[name for name, _ in groups])
  1. Is R shared among all instances of rpy2 or is there an instance of R for each instance of rpy2? Answers to this question will be accepted only if they lead to a resolution of the problem.

rpy2 通过链接其 C 共享库使 R 可用。一个这样的库 per Python process,这是一个有状态的库(R 无法处理并发)。我认为您的问题与对象序列化(参见 http://rpy2.readthedocs.io/en/version_2.8.x/robjects_serialization.html#object-serialization)有关,而不是与并发有关。

在 Python 腌制 rpy2 对象后重建 R 对象时,发生了一些明显的混乱。更具体地说,当查看错误消息中提到的 R 对象类型时:

>>> from rpy2.rinterface import str_typeint
>>> str_typeint(6)
'LANGSXP'
>>> str_typeint(24)
'RAWSXP'

我猜测 return 由 forecast.Arima 编辑的 R 对象包含一个未计算的 R 表达式(例如导致该结果对象的调用)并且在序列化和反序列化时它会返回作为不同的东西(字节的原始向量)。这可能是 R 自己的序列化机制的错误(因为 rpy2 在后台使用它)。现在,为了解决您的问题,您可能想要从工作人员的函数调用 运行 中提取您最关心的 forecast.Arima 并且仅 return 的内容。

对问题工作中提供的伪代码中的 arima_select 函数进行了以下更改:

import numpy as np
import pandas as pd
from rpy2 import rinterface as ri

ri.initr()

def arima_select(y, order):

    def rimport(packname):
        as_environment = ri.baseenv['as.environment']
        require = ri.baseenv['require']
        require(ri.StrSexpVector([packname]),
                quiet = ri.BoolSexpVector((True, )))
        packname = ri.StrSexpVector(['package:' + str(packname)])
        pack_env = as_environment(packname)
        return pack_env

    frcst = rimport("forecast")
    args = (('y', ri.FloatSexpVector(y)),
            ('order', ri.FloatSexpVector(order)),
            ('include.constant', ri.StrSexpVector(const)))
    return frcst['Arima'].rcall(args, ri.globalenv)

保持伪代码的其余部分不变。请注意,此后我进一步优化了代码,它不需要问题中提供的所有功能。基本上,以下是必要且充分的:

import numpy as np
import pandas as pd
from rpy2 import rinterface as ri

ri.initr()

def arima(y, order=(1,1,1)):
    # This is the same as arima_select above, just renamed to arima
    ...

def applyParallel(groups, func):
    from multiprocessing import Pool, cpu_count
    with Pool(cpu_count(), worker_init) as p:
        ret_list = p.map(func, [group for _, group in groups])
    return pd.concat(ret_list, keys=[name for name, _ in groups])

def main():
    # Create your df in your favorite way:
    def data_gen(start_day):
        r = pd.Series(pd.date_range('2016-09-{}'.format(str(start_day)),
                                    periods=24*60, freq='T'),
                      name='tstamp')
        d = pd.Series(np.random.randint(10, 80, 1440), name='val')
        s = pd.Series(['sensor1']*1440, name='sensor')
        return pd.concat([s, r, d], axis=1)
    df = pd.concat([data_gen(day) for day in range(1,8)], ignore_index=True)

    applyParallel(df.groupby(['sensor', pd.Grouper(key='tstamp', freq='D')]),
                  arima) # Note one may use partial from functools to pass order to arima

请注意,我也不直接从 applyParallel 调用 arima,因为我的目标是为给定系列(传感器和日期)找到最佳模型。我使用函数 arima_wrapper 遍历订单组合,并在每次迭代时调用 arima