调用时来自 Dask 的 PicklingError Client.Submit
PicklingError from Dask when calling Client.Submit
我有一个回测引擎,我尝试使用 dask 对其进行并行化。我可以独立处理每周的数据,所以我认为我可以通过解雇一个工作人员来轻松并行化,该工作人员将创建一个完整的回测引擎实例,并在一周的数据上 运行 它。
这是我根据Dask Futures docs.
尝试的大纲代码
from dask.distributed import Client
from backtest_engine import *
def run_backtest(start_date, end_date):
engine = backtest_engine()
engine.price_spread = 2
engine.emulate_ticks = True
engine.run_walk_forward(start_date, end_date, 'my_market', my_strategy_class)
return True
if __name__ == "__main__":
client = Client()
a = client.submit(run_backtest, datetime(2017,9,3), datetime(2017,9,9))
b = client.submit(run_backtest, datetime(2017,9,10), datetime(2017,9,17))
代码在调用 client.submit()
时失败:
_pickle.PicklingError: Could not pickle object as excessively deep recursion required.
代码有问题吗,或者我的方法在某些方面存在根本性缺陷,或者我可以在 worker 中进行的调用有限制吗?
伊恩
对于背景:run_walk_forward()
将 HD5 文件加载到 Pandas 数据帧中,遍历数据帧以产生结果,然后将结果写入磁盘。
Dask 使用 cloudpickle 进行函数序列化。我建议尝试 cloudpickle.loads(cloudpickle.dumps(obj))
你的函数和你对 client.submit
.
的每个参数
例如,您的函数的某些依赖项(如 backtest_engine
)可能依赖于锁定或打开的文件,这些文件不容易序列化并在机器之间发送。
按照 MRocklin 的上述建议,我 运行 逐个元素检查 class,这里是导致上述异常的问题。
1. Class 有内部函数
感谢 Emlyn O'Regan blogpost 这让我意识到我的代码执行了以下操作并且无法 pickle:
class A:
def __init__(self, parent):
self.parent = parent
def do_something():
# access self.parent
class B:
def my_func():
a = A(self)
a.do_something()
已通过重写代码将属性作为参数传递给适当的方法来修复。
2。 Class 引用的全局变量
这只是匆忙编写的代码,我在 class 定义之外放置了一些常量变量(参数在启动时设置一次),然后从需要的 class 中引用被腌制。
已通过将变量移动到 class 定义中进行修复。
3。 Class 在主模块中定义
我从主模块中的基础 class 继承来创建一个具有覆盖行为的新 class。新的 class 位于 main 名称 space,因此无法 pickle。
解决方案只是将继承的 class 的定义移动到一个单独的文件中,因此它位于 space 而不是 main 中。即代替:
my_script.py
class A:
# define stuff
if __name__ == "__main__":
# do stuff which will instantiate A
创建两个文件:
class_a.py
class A:
#define stuff
main.py
from class_a import A
if __name__ == "__main__":
# do stuff which at some point will instantiate A
伊恩
我有一个回测引擎,我尝试使用 dask 对其进行并行化。我可以独立处理每周的数据,所以我认为我可以通过解雇一个工作人员来轻松并行化,该工作人员将创建一个完整的回测引擎实例,并在一周的数据上 运行 它。
这是我根据Dask Futures docs.
尝试的大纲代码from dask.distributed import Client
from backtest_engine import *
def run_backtest(start_date, end_date):
engine = backtest_engine()
engine.price_spread = 2
engine.emulate_ticks = True
engine.run_walk_forward(start_date, end_date, 'my_market', my_strategy_class)
return True
if __name__ == "__main__":
client = Client()
a = client.submit(run_backtest, datetime(2017,9,3), datetime(2017,9,9))
b = client.submit(run_backtest, datetime(2017,9,10), datetime(2017,9,17))
代码在调用 client.submit()
时失败:
_pickle.PicklingError: Could not pickle object as excessively deep recursion required.
代码有问题吗,或者我的方法在某些方面存在根本性缺陷,或者我可以在 worker 中进行的调用有限制吗?
伊恩
对于背景:run_walk_forward()
将 HD5 文件加载到 Pandas 数据帧中,遍历数据帧以产生结果,然后将结果写入磁盘。
Dask 使用 cloudpickle 进行函数序列化。我建议尝试 cloudpickle.loads(cloudpickle.dumps(obj))
你的函数和你对 client.submit
.
例如,您的函数的某些依赖项(如 backtest_engine
)可能依赖于锁定或打开的文件,这些文件不容易序列化并在机器之间发送。
按照 MRocklin 的上述建议,我 运行 逐个元素检查 class,这里是导致上述异常的问题。
1. Class 有内部函数
感谢 Emlyn O'Regan blogpost 这让我意识到我的代码执行了以下操作并且无法 pickle:
class A:
def __init__(self, parent):
self.parent = parent
def do_something():
# access self.parent
class B:
def my_func():
a = A(self)
a.do_something()
已通过重写代码将属性作为参数传递给适当的方法来修复。
2。 Class 引用的全局变量
这只是匆忙编写的代码,我在 class 定义之外放置了一些常量变量(参数在启动时设置一次),然后从需要的 class 中引用被腌制。
已通过将变量移动到 class 定义中进行修复。
3。 Class 在主模块中定义
我从主模块中的基础 class 继承来创建一个具有覆盖行为的新 class。新的 class 位于 main 名称 space,因此无法 pickle。
解决方案只是将继承的 class 的定义移动到一个单独的文件中,因此它位于 space 而不是 main 中。即代替:
my_script.py
class A:
# define stuff
if __name__ == "__main__":
# do stuff which will instantiate A
创建两个文件:
class_a.py
class A:
#define stuff
main.py
from class_a import A
if __name__ == "__main__":
# do stuff which at some point will instantiate A
伊恩