pandas + multiprocessing: "NotImplementedError: Not supported for DataFrames!"
pandas + multiprocessing: "NotImplementedError: Not supported for DataFrames!"
在我的 previous thread was marked as a 之后,它为我指明了多进程管理器的方向。我正在尝试使用多处理来创建一个服务来处理我的 pandas 数据帧以提供给 Flask 请求。到目前为止,这是我的代码:
df_manager.py
from multiprocessing.managers import BaseManager
import pandas as pd
def init_dataframe():
return pd.DataFrame(data={'col1': [1, 2], 'col2': [3, 4]})
def get_df():
return df
df = init_dataframe()
manager = BaseManager(('', 37844), b'password')
manager.register('get_df', get_df)
server = manager.get_server()
server.serve_forever()
data_handler.py
from multiprocessing.managers import BaseManager
import pandas as pd
def get_df():
manager = BaseManager(('', 37844), b'password')
manager.register('get_df')
manager.connect()
return manager.get_df()
def data():
df = get_df()
return df.to_dict()
if __name__ == '__main__':
data()
不幸的是,这会在尝试调用 data_handler.py
中的 manager.get_df()
时引发异常。
Traceback (most recent call last):
File "src/data_handler.py", line 15, in <module>
data()
File "src/data_handler.py", line 11, in data
df = get_df()
File "src/data_handler.py", line 8, in get_df
return manager.get_df()
File "/usr/lib/python3.7/multiprocessing/managers.py", line 724, in temp
token, exp = self._create(typeid, *args, **kwds)
File "/usr/lib/python3.7/multiprocessing/managers.py", line 609, in _create
id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
File "/usr/lib/python3.7/multiprocessing/managers.py", line 82, in dispatch
raise convert_to_error(kind, result)
multiprocessing.managers.RemoteError:
---------------------------------------------------------------------------
Traceback (most recent call last):
File "/usr/lib/python3.7/multiprocessing/managers.py", line 201, in handle_request
result = func(c, *args, **kwds)
File "/usr/lib/python3.7/multiprocessing/managers.py", line 391, in create
exposed = public_methods(obj)
File "/usr/lib/python3.7/multiprocessing/managers.py", line 122, in public_methods
return [name for name in all_methods(obj) if name[0] != '_']
File "/usr/lib/python3.7/multiprocessing/managers.py", line 113, in all_methods
func = getattr(obj, name)
File "/home/admin/dev/pandas-multiprocessing/venv/lib/python3.7/site-packages/pandas/core/frame.py", line 392, in _constructor_expanddim
raise NotImplementedError("Not supported for DataFrames!")
NotImplementedError: Not supported for DataFrames!
---------------------------------------------------------------------------
任何正确方向的帮助将不胜感激!
编辑:这似乎是由 DataFrames 引起的,因为返回 df.to_json()
而不是 df_manager.py
中的 df
似乎有效美好的。仍在调查中...
EDIT2:我已经更新了代码以删除 Flask 依赖项,因为它似乎与它无关。
此问题已由 exposing
BaseManager
使用的代理的相关方法修复。这可以在 data_handler.py
中的 register
调用中完成。
df_manager.py
from multiprocessing.managers import BaseManager
import pandas as pd
def init_dataframe():
return pd.DataFrame(data={'col1': [1, 2], 'col2': [3, 4]})
def get_df():
return df
df = init_dataframe()
manager = BaseManager(('', 37844), b'password')
manager.register('get_df', callable=get_df, exposed='get_df') # Adding `exposed` parameter was the key to solving the issue
server = manager.get_server()
server.serve_forever()
data_handler.py
from multiprocessing.managers import BaseManager
import pandas as pd
def get_df():
manager = BaseManager(('', 37844), b'password')
manager.register('get_df')
manager.connect()
return manager.get_df()
def data():
df = get_df()
return df
if __name__ == '__main__':
print(data())
在我的 previous thread was marked as a
df_manager.py
from multiprocessing.managers import BaseManager
import pandas as pd
def init_dataframe():
return pd.DataFrame(data={'col1': [1, 2], 'col2': [3, 4]})
def get_df():
return df
df = init_dataframe()
manager = BaseManager(('', 37844), b'password')
manager.register('get_df', get_df)
server = manager.get_server()
server.serve_forever()
data_handler.py
from multiprocessing.managers import BaseManager
import pandas as pd
def get_df():
manager = BaseManager(('', 37844), b'password')
manager.register('get_df')
manager.connect()
return manager.get_df()
def data():
df = get_df()
return df.to_dict()
if __name__ == '__main__':
data()
不幸的是,这会在尝试调用 data_handler.py
中的 manager.get_df()
时引发异常。
Traceback (most recent call last):
File "src/data_handler.py", line 15, in <module>
data()
File "src/data_handler.py", line 11, in data
df = get_df()
File "src/data_handler.py", line 8, in get_df
return manager.get_df()
File "/usr/lib/python3.7/multiprocessing/managers.py", line 724, in temp
token, exp = self._create(typeid, *args, **kwds)
File "/usr/lib/python3.7/multiprocessing/managers.py", line 609, in _create
id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
File "/usr/lib/python3.7/multiprocessing/managers.py", line 82, in dispatch
raise convert_to_error(kind, result)
multiprocessing.managers.RemoteError:
---------------------------------------------------------------------------
Traceback (most recent call last):
File "/usr/lib/python3.7/multiprocessing/managers.py", line 201, in handle_request
result = func(c, *args, **kwds)
File "/usr/lib/python3.7/multiprocessing/managers.py", line 391, in create
exposed = public_methods(obj)
File "/usr/lib/python3.7/multiprocessing/managers.py", line 122, in public_methods
return [name for name in all_methods(obj) if name[0] != '_']
File "/usr/lib/python3.7/multiprocessing/managers.py", line 113, in all_methods
func = getattr(obj, name)
File "/home/admin/dev/pandas-multiprocessing/venv/lib/python3.7/site-packages/pandas/core/frame.py", line 392, in _constructor_expanddim
raise NotImplementedError("Not supported for DataFrames!")
NotImplementedError: Not supported for DataFrames!
---------------------------------------------------------------------------
任何正确方向的帮助将不胜感激!
编辑:这似乎是由 DataFrames 引起的,因为返回 df.to_json()
而不是 df_manager.py
中的 df
似乎有效美好的。仍在调查中...
EDIT2:我已经更新了代码以删除 Flask 依赖项,因为它似乎与它无关。
此问题已由 exposing
BaseManager
使用的代理的相关方法修复。这可以在 data_handler.py
中的 register
调用中完成。
df_manager.py
from multiprocessing.managers import BaseManager
import pandas as pd
def init_dataframe():
return pd.DataFrame(data={'col1': [1, 2], 'col2': [3, 4]})
def get_df():
return df
df = init_dataframe()
manager = BaseManager(('', 37844), b'password')
manager.register('get_df', callable=get_df, exposed='get_df') # Adding `exposed` parameter was the key to solving the issue
server = manager.get_server()
server.serve_forever()
data_handler.py
from multiprocessing.managers import BaseManager
import pandas as pd
def get_df():
manager = BaseManager(('', 37844), b'password')
manager.register('get_df')
manager.connect()
return manager.get_df()
def data():
df = get_df()
return df
if __name__ == '__main__':
print(data())