实例属性不会使用多处理持久化
Instance attributes do not persist using multiprocessing
我遇到的问题是实例不保留对属性的更改,甚至不保留创建的新属性。我想我已经将其缩小到我的脚本利用多处理的事实,并且我认为当脚本 returns 到时,单独进程线程中实例发生的变化不是 'remembered'主线程。
基本上,我有几组数据需要并行处理。数据存储为属性,并通过 class 中的几种方法进行更改。在处理结束时,我希望 return 到主线程并连接来自每个对象实例的数据。但是,如上所述,当我在并行处理位完成后尝试使用数据访问实例属性时,那里什么也没有。好像在多处理位期间发生的任何更改都是 'forgotten'。
是否有解决此问题的明显解决方案?或者我是否需要重建我的代码以代替 return 处理后的数据,而不仅仅是 altering/storing 它作为实例属性?我想另一种解决方案是序列化数据,然后在必要时重新读取它,而不是仅仅将其保存在内存中。
这里可能值得注意的是我使用的是 pathos
模块而不是 python 的 multiprocessing
模块。我遇到了一些与酸洗有关的错误,类似于此处:Python multiprocessing PicklingError: Can't pickle <type 'function'>。我的代码分为几个模块,如前所述,数据处理方法包含在 class 中。
对不起文字墙。
编辑
这是我的代码:
import importlib
import pandas as pd
from pathos.helpers import mp
from provider import Provider
# list of data providers ... length is arbitrary
operating_providers = ['dataprovider1', 'dataprovider2', 'dataprovider3']
# create provider objects for each operating provider
provider_obj_list = []
for name in operating_providers:
loc = 'providers.%s' % name
module = importlib.import_module(loc)
provider_obj = Provider(module)
provider_obj_list.append(provider_obj)
processes = []
for instance in provider_obj_list:
process = mp.Process(target = instance.data_processing_func)
process.daemon = True
process.start()
processes.append(process)
for process in processes:
process.join()
# now that data_processing_func is complete for each set of data,
# stack all the data
stack = pd.concat((instance.data for instance in provider_obj_list))
我有许多模块(它们的名称在 operating_providers
中列出)包含特定于其数据源的属性。这些模块被迭代导入并传递给我在单独模块 (provider
) 中创建的 Provider class 的新实例。我将每个 Provider 实例附加到一个列表 (provider_obj_list
),然后迭代地创建单独的进程来调用实例方法 instance.data_processing_func
。此函数进行一些数据处理(每个实例访问完全不同的数据文件),并在此过程中创建新的实例属性,我需要在并行处理完成时访问这些属性。
我尝试使用多线程而不是多处理——在这种情况下,我的实例属性保持不变,这正是我想要的。但是,我不确定为什么会发生这种情况——我将不得不研究线程与多处理之间的区别。
感谢您的帮助!
下面是一些示例代码,展示了如何执行我在评论中概述的操作。我无法测试它,因为我没有安装 provider
或 pathos
,但它应该让您对我的建议有一个很好的了解。
import importlib
from pathos.helpers import mp
from provider import Provider
def process_data(loc):
module = importlib.import_module(loc)
provider_obj = Provider(module)
provider_obj.data_processing_func()
if __name__ == '__main__':
# list of data providers ... length is arbitrary
operating_providers = ['dataprovider1', 'dataprovider2', 'dataprovider3']
# create list of provider locations for each operating provider
provider_loc_list = []
for name in operating_providers:
loc = 'providers.%s' % name
provider_loc_list.append(loc)
processes = []
for loc in provider_loc_list:
process = mp.Process(target=process_data, args=(loc,))
process.daemon = True
process.start()
processes.append(process)
for process in processes:
process.join()
我遇到的问题是实例不保留对属性的更改,甚至不保留创建的新属性。我想我已经将其缩小到我的脚本利用多处理的事实,并且我认为当脚本 returns 到时,单独进程线程中实例发生的变化不是 'remembered'主线程。
基本上,我有几组数据需要并行处理。数据存储为属性,并通过 class 中的几种方法进行更改。在处理结束时,我希望 return 到主线程并连接来自每个对象实例的数据。但是,如上所述,当我在并行处理位完成后尝试使用数据访问实例属性时,那里什么也没有。好像在多处理位期间发生的任何更改都是 'forgotten'。
是否有解决此问题的明显解决方案?或者我是否需要重建我的代码以代替 return 处理后的数据,而不仅仅是 altering/storing 它作为实例属性?我想另一种解决方案是序列化数据,然后在必要时重新读取它,而不是仅仅将其保存在内存中。
这里可能值得注意的是我使用的是 pathos
模块而不是 python 的 multiprocessing
模块。我遇到了一些与酸洗有关的错误,类似于此处:Python multiprocessing PicklingError: Can't pickle <type 'function'>。我的代码分为几个模块,如前所述,数据处理方法包含在 class 中。
对不起文字墙。
编辑 这是我的代码:
import importlib
import pandas as pd
from pathos.helpers import mp
from provider import Provider
# list of data providers ... length is arbitrary
operating_providers = ['dataprovider1', 'dataprovider2', 'dataprovider3']
# create provider objects for each operating provider
provider_obj_list = []
for name in operating_providers:
loc = 'providers.%s' % name
module = importlib.import_module(loc)
provider_obj = Provider(module)
provider_obj_list.append(provider_obj)
processes = []
for instance in provider_obj_list:
process = mp.Process(target = instance.data_processing_func)
process.daemon = True
process.start()
processes.append(process)
for process in processes:
process.join()
# now that data_processing_func is complete for each set of data,
# stack all the data
stack = pd.concat((instance.data for instance in provider_obj_list))
我有许多模块(它们的名称在 operating_providers
中列出)包含特定于其数据源的属性。这些模块被迭代导入并传递给我在单独模块 (provider
) 中创建的 Provider class 的新实例。我将每个 Provider 实例附加到一个列表 (provider_obj_list
),然后迭代地创建单独的进程来调用实例方法 instance.data_processing_func
。此函数进行一些数据处理(每个实例访问完全不同的数据文件),并在此过程中创建新的实例属性,我需要在并行处理完成时访问这些属性。
我尝试使用多线程而不是多处理——在这种情况下,我的实例属性保持不变,这正是我想要的。但是,我不确定为什么会发生这种情况——我将不得不研究线程与多处理之间的区别。
感谢您的帮助!
下面是一些示例代码,展示了如何执行我在评论中概述的操作。我无法测试它,因为我没有安装 provider
或 pathos
,但它应该让您对我的建议有一个很好的了解。
import importlib
from pathos.helpers import mp
from provider import Provider
def process_data(loc):
module = importlib.import_module(loc)
provider_obj = Provider(module)
provider_obj.data_processing_func()
if __name__ == '__main__':
# list of data providers ... length is arbitrary
operating_providers = ['dataprovider1', 'dataprovider2', 'dataprovider3']
# create list of provider locations for each operating provider
provider_loc_list = []
for name in operating_providers:
loc = 'providers.%s' % name
provider_loc_list.append(loc)
processes = []
for loc in provider_loc_list:
process = mp.Process(target=process_data, args=(loc,))
process.daemon = True
process.start()
processes.append(process)
for process in processes:
process.join()