调用时来自 Dask 的 PicklingError Client.Submit

PicklingError from Dask when calling Client.Submit

我有一个回测引擎,我尝试使用 dask 对其进行并行化。我可以独立处理每周的数据,所以我认为我可以通过解雇一个工作人员来轻松并行化,该工作人员将创建一个完整的回测引擎实例,并在一周的数据上 运行 它。

这是我根据Dask Futures docs.

尝试的大纲代码
from dask.distributed import Client
from backtest_engine import *

def run_backtest(start_date, end_date):
    engine = backtest_engine()
    engine.price_spread = 2
    engine.emulate_ticks = True
    engine.run_walk_forward(start_date, end_date, 'my_market', my_strategy_class)
    return True

if __name__ == "__main__":
    client = Client()
    a = client.submit(run_backtest, datetime(2017,9,3), datetime(2017,9,9))
    b = client.submit(run_backtest, datetime(2017,9,10), datetime(2017,9,17))

代码在调用 client.submit() 时失败:

_pickle.PicklingError: Could not pickle object as excessively deep recursion required.

代码有问题吗,或者我的方法在某些方面存在根本性缺陷,或者我可以在 worker 中进行的调用有限制吗?

伊恩

对于背景:run_walk_forward() 将 HD5 文件加载到 Pandas 数据帧中,遍历数据帧以产生结果,然后将结果写入磁盘。

Dask 使用 cloudpickle 进行函数序列化。我建议尝试 cloudpickle.loads(cloudpickle.dumps(obj)) 你的函数和你对 client.submit.

的每个参数

例如,您的函数的某些依赖项(如 backtest_engine)可能依赖于锁定或打开的文件,这些文件不容易序列化并在机器之间发送。

按照 MRocklin 的上述建议,我 运行 逐个元素检查 class,这里是导致上述异常的问题。

1. Class 有内部函数

感谢 Emlyn O'Regan blogpost 这让我意识到我的代码执行了以下操作并且无法 pickle:

class A:
   def __init__(self, parent):
        self.parent = parent
   def do_something():
        # access self.parent

class B:
   def my_func():
      a = A(self)
      a.do_something()

已通过重写代码将属性作为参数传递给适当的方法来修复。

2。 Class 引用的全局变量

这只是匆忙编写的代码,我在 class 定义之外放置了一些常量变量(参数在启动时设置一次),然后从需要的 class 中引用被腌制。

已通过将变量移动到 class 定义中进行修复。

3。 Class 在主模块中定义

我从主模块中的基础 class 继承来创建一个具有覆盖行为的新 class。新的 class 位于 main 名称 space,因此无法 pickle。

解决方案只是将继承的 class 的定义移动到一个单独的文件中,因此它位于 space 而不是 main 中。即代替:

my_script.py

class A:
   # define stuff

if __name__ == "__main__":
   # do stuff which will instantiate A

创建两个文件:

class_a.py

class A:
   #define stuff

main.py

   from class_a import A
   if __name__ == "__main__":
   # do stuff which at some point will instantiate A

伊恩