在不同的进程之间共享内存中的复杂 python 对象
Sharing a complex python object in memory between separate processes
我有一个复杂的 python 对象,内存大小约为 36GB,我想在多个单独的 python 进程之间共享它。它作为 pickle 文件存储在磁盘上,我目前为每个进程单独加载它。我想共享此对象,以便在可用内存量下并行执行更多进程。
从某种意义上说,此对象用作只读数据库。每个进程每秒发起多次访问请求,每次请求只针对一小部分数据。
我研究了像 Radis 这样的解决方案,但我发现最终需要将数据序列化为简单的文本形式。此外,将 pickle 文件本身映射到内存也无济于事,因为每个进程都需要提取它。所以我想到了另外两种可能的解决方案:
- 使用共享内存,每个进程都可以访问存储对象的地址。这里的问题是进程只会看到大量的字节,无法解释
- 通过 API 调用编写保存此对象并管理数据检索的代码。在这里,我想知道这种解决方案在速度方面的表现。
有没有一种简单的方法可以实现这两种解决方案?对于这种情况,也许有更好的解决方案?
非常感谢!
对于复杂的对象,没有现成的方法可以直接在进程之间共享内存。如果你有简单的 ctypes
,你可以在 c 风格的共享内存中执行此操作,但它不会直接映射到 python 个对象。
如果您在任何时候只需要一部分数据,而不是整个 36GB,那么有一个简单的解决方案很有效。为此,您可以使用 multiprocessing.managers
中的 SyncManager
。使用它,您可以设置一个服务器,为您的数据提供代理 class(您的数据不存储在 class 中,代理只提供对它的访问)。然后,您的客户端使用 BaseManager
连接到服务器并调用代理 class 中的方法来检索数据。
在幕后,Manager
classes 负责处理您请求的数据并通过开放端口将其从服务器发送到客户端。因为您在每次调用时都在挑选数据,所以如果您需要整个数据集,那么效率不高。在客户端只需要一小部分数据的情况下,这种方法可以节省很多时间,因为数据只需要由服务器加载一次。
该解决方案在速度方面可与数据库解决方案相媲美,但如果您希望保持纯粹的 pythonic 解决方案,它可以为您节省大量复杂性和 DB 学习。
下面是一些用于 GloVe 词向量的示例代码。
服务器
#!/usr/bin/python
import sys
from multiprocessing.managers import SyncManager
import numpy
# Global for storing the data to be served
gVectors = {}
# Proxy class to be shared with different processes
# Don't but the big vector data in here since that will force it to
# be piped to the other process when instantiated there, instead just
# return the global vector data, from this process, when requested.
class GloVeProxy(object):
def __init__(self):
pass
def getNVectors(self):
global gVectors
return len(gVectors)
def getEmpty(self):
global gVectors
return numpy.zeros_like(gVectors.values()[0])
def getVector(self, word, default=None):
global gVectors
return gVectors.get(word, default)
# Class to encapsulate the server functionality
class GloVeServer(object):
def __init__(self, port, fname):
self.port = port
self.load(fname)
# Load the vectors into gVectors (global)
@staticmethod
def load(filename):
global gVectors
f = open(filename, 'r')
for line in f:
vals = line.rstrip().split(' ')
gVectors[vals[0]] = numpy.array(vals[1:]).astype('float32')
# Run the server
def run(self):
class myManager(SyncManager): pass
myManager.register('GloVeProxy', GloVeProxy)
mgr = myManager(address=('', self.port), authkey='GloVeProxy01')
server = mgr.get_server()
server.serve_forever()
if __name__ == '__main__':
port = 5010
fname = '/mnt/raid/Data/Misc/GloVe/WikiGiga/glove.6B.50d.txt'
print 'Loading vector data'
gs = GloVeServer(port, fname)
print 'Serving data. Press <ctrl>-c to stop.'
gs.run()
客户端
from multiprocessing.managers import BaseManager
import psutil #3rd party module for process info (not strictly required)
# Grab the shared proxy class. All methods in that class will be availble here
class GloVeClient(object):
def __init__(self, port):
assert self._checkForProcess('GloVeServer.py'), 'Must have GloVeServer running'
class myManager(BaseManager): pass
myManager.register('GloVeProxy')
self.mgr = myManager(address=('localhost', port), authkey='GloVeProxy01')
self.mgr.connect()
self.glove = self.mgr.GloVeProxy()
# Return the instance of the proxy class
@staticmethod
def getGloVe(port):
return GloVeClient(port).glove
# Verify the server is running
@staticmethod
def _checkForProcess(name):
for proc in psutil.process_iter():
if proc.name() == name:
return True
return False
if __name__ == '__main__':
port = 5010
glove = GloVeClient.getGloVe(port)
for word in ['test', 'cat', '123456']:
print('%s = %s' % (word, glove.getVector(word)))
请注意,psutil
库仅用于检查您是否拥有服务器 运行,这不是必需的。请务必将服务器命名为 GloVeServer.py
或在代码中通过 psutil
更改检查,以便它查找正确的名称。
我有一个复杂的 python 对象,内存大小约为 36GB,我想在多个单独的 python 进程之间共享它。它作为 pickle 文件存储在磁盘上,我目前为每个进程单独加载它。我想共享此对象,以便在可用内存量下并行执行更多进程。
从某种意义上说,此对象用作只读数据库。每个进程每秒发起多次访问请求,每次请求只针对一小部分数据。
我研究了像 Radis 这样的解决方案,但我发现最终需要将数据序列化为简单的文本形式。此外,将 pickle 文件本身映射到内存也无济于事,因为每个进程都需要提取它。所以我想到了另外两种可能的解决方案:
- 使用共享内存,每个进程都可以访问存储对象的地址。这里的问题是进程只会看到大量的字节,无法解释
- 通过 API 调用编写保存此对象并管理数据检索的代码。在这里,我想知道这种解决方案在速度方面的表现。
有没有一种简单的方法可以实现这两种解决方案?对于这种情况,也许有更好的解决方案?
非常感谢!
对于复杂的对象,没有现成的方法可以直接在进程之间共享内存。如果你有简单的 ctypes
,你可以在 c 风格的共享内存中执行此操作,但它不会直接映射到 python 个对象。
如果您在任何时候只需要一部分数据,而不是整个 36GB,那么有一个简单的解决方案很有效。为此,您可以使用 multiprocessing.managers
中的 SyncManager
。使用它,您可以设置一个服务器,为您的数据提供代理 class(您的数据不存储在 class 中,代理只提供对它的访问)。然后,您的客户端使用 BaseManager
连接到服务器并调用代理 class 中的方法来检索数据。
在幕后,Manager
classes 负责处理您请求的数据并通过开放端口将其从服务器发送到客户端。因为您在每次调用时都在挑选数据,所以如果您需要整个数据集,那么效率不高。在客户端只需要一小部分数据的情况下,这种方法可以节省很多时间,因为数据只需要由服务器加载一次。
该解决方案在速度方面可与数据库解决方案相媲美,但如果您希望保持纯粹的 pythonic 解决方案,它可以为您节省大量复杂性和 DB 学习。
下面是一些用于 GloVe 词向量的示例代码。
服务器
#!/usr/bin/python
import sys
from multiprocessing.managers import SyncManager
import numpy
# Global for storing the data to be served
gVectors = {}
# Proxy class to be shared with different processes
# Don't but the big vector data in here since that will force it to
# be piped to the other process when instantiated there, instead just
# return the global vector data, from this process, when requested.
class GloVeProxy(object):
def __init__(self):
pass
def getNVectors(self):
global gVectors
return len(gVectors)
def getEmpty(self):
global gVectors
return numpy.zeros_like(gVectors.values()[0])
def getVector(self, word, default=None):
global gVectors
return gVectors.get(word, default)
# Class to encapsulate the server functionality
class GloVeServer(object):
def __init__(self, port, fname):
self.port = port
self.load(fname)
# Load the vectors into gVectors (global)
@staticmethod
def load(filename):
global gVectors
f = open(filename, 'r')
for line in f:
vals = line.rstrip().split(' ')
gVectors[vals[0]] = numpy.array(vals[1:]).astype('float32')
# Run the server
def run(self):
class myManager(SyncManager): pass
myManager.register('GloVeProxy', GloVeProxy)
mgr = myManager(address=('', self.port), authkey='GloVeProxy01')
server = mgr.get_server()
server.serve_forever()
if __name__ == '__main__':
port = 5010
fname = '/mnt/raid/Data/Misc/GloVe/WikiGiga/glove.6B.50d.txt'
print 'Loading vector data'
gs = GloVeServer(port, fname)
print 'Serving data. Press <ctrl>-c to stop.'
gs.run()
客户端
from multiprocessing.managers import BaseManager
import psutil #3rd party module for process info (not strictly required)
# Grab the shared proxy class. All methods in that class will be availble here
class GloVeClient(object):
def __init__(self, port):
assert self._checkForProcess('GloVeServer.py'), 'Must have GloVeServer running'
class myManager(BaseManager): pass
myManager.register('GloVeProxy')
self.mgr = myManager(address=('localhost', port), authkey='GloVeProxy01')
self.mgr.connect()
self.glove = self.mgr.GloVeProxy()
# Return the instance of the proxy class
@staticmethod
def getGloVe(port):
return GloVeClient(port).glove
# Verify the server is running
@staticmethod
def _checkForProcess(name):
for proc in psutil.process_iter():
if proc.name() == name:
return True
return False
if __name__ == '__main__':
port = 5010
glove = GloVeClient.getGloVe(port)
for word in ['test', 'cat', '123456']:
print('%s = %s' % (word, glove.getVector(word)))
请注意,psutil
库仅用于检查您是否拥有服务器 运行,这不是必需的。请务必将服务器命名为 GloVeServer.py
或在代码中通过 psutil
更改检查,以便它查找正确的名称。