nodes/edges 自定义存储的 NetworkX 可扩展性
NetworkX extensibility for custom storage of nodes/edges
NetworkX 是否支持自定义节点、边和属性的存储位置?例如,我想尝试 2 个选项:
使用 LevelDB / Kyoto Cabinet 之类的东西作为后备存储。
使用一些分布式数据库(Neo4j 甚至 HBase - 我只需要 nodes/edges 的分布式存储)作为后备存储。
NetworkX 支持这些东西的扩展点是什么?
应该可以通过子classing Graph class 并提供用户定义的工厂函数来扩展 networkx。
这些函数可以查询数据库并将结果缓存在 networkx 使用的字典中。
我无法从联机文档中找到这些行,但在 code 中您有:
Subclasses(高级):
Graph class 使用 dict-of-dict-of-dict 数据结构。
外部字典 (node_dict) 包含以节点为键的邻接列表。
接下来的dict(adjlist)表示邻接表,持有
由邻居键控的边缘数据。内部字典 (edge_attr) 表示
边缘数据并保存由属性名称键入的边缘属性值。
这三个字典中的每一个都可以由用户定义的替换
类似字典的对象。一般来说,dict-like 的特征应该是
维护,但可以添加额外的功能。更换其中之一
dicts 通过更改 class(!) 变量创建一个新图 class
为类似 dict 的结构持有工厂。变量名称
是 node_dict_factory、adjlist_dict_factory 和 edge_attr_dict_factory。
node_dict_factory : function, (default: dict)
Factory function to be used to create the outer-most dict
in the data structure that holds adjacency lists keyed by node.
It should require no arguments and return a dict-like object.
adjlist_dict_factory : function, (default: dict)
Factory function to be used to create the adjacency list
dict which holds edge data keyed by neighbor.
It should require no arguments and return a dict-like object
edge_attr_dict_factory : function, (default: dict)
Factory function to be used to create the edge attribute
dict which holds attrbute values keyed by attribute name.
It should require no arguments and return a dict-like object.
我不知道 networkx 有任何官方扩展。
我将 post 为外部存储设置 NetworkX 的细微差别。 Kikohs 正确地指出每个词典都有工厂。这些可以被覆盖。
对于持久存储,唯一真正需要特别注意的字典是节点字典。
必须特别注意类字典实现的行为方式。 NetworkX classes 中有一些代码可以更改从内存中的字典返回的值,而不会将它们重新设置。
例如:
self.succ[u][v]=datadict
self.pred[v][u]=datadict
这些值不会持久保存回存储后端。为了适应这一点,我使用了一个内存缓存,将对象保存在内存中,当它们被逐出时,它会将它们写入底层存储。
对于我使用的内存缓存 cachetools
。有关驱逐,请参阅:
对于底层存储,我使用了 plyvel
(https://plyvel.readthedocs.org/en/latest/),它是 LevelDB 的 Python 接口。
下面我也给出了字典的实现。请注意,代码中仍然存在错误和错误,并且还没有经过适当的测试,但您已经了解了大致的思路。
class PlyvelBatchWrite(object):
def __init__(self, plv_dict):
self.__batch = plv_dict._db.write_batch()
self.__plvd = plv_dict
def set(self, key, val):
self.__batch.put(self.__plvd.serializer.pack(key), self.__plvd.serializer.pack(val))
def delete(self, key):
self.__batch.delete(self.__plvd.serializer.pack(key))
def clear(self):
self.__batch.clear()
def commit(self):
self.__batch.write()
class PlyvelDict(MutableMapping):
def __init__(self, directory='', db=None, serializer_factory=None, cache_factory=None, **kwargs):
self.__directory = directory
ensure_directory(directory)
if isinstance(db, str) or db is None:
if db is None:
# generate UUID
db = str(uuid.uuid4())
self.__db = db
db = plyvel.DB(self.name(), **kwargs)
else:
self.__db = kwargs['db']
self._db = db
if serializer_factory:
self.serializer = serializer_factory()
else:
self.serializer = None
if cache_factory:
self.__cache = cache_factory(self.__cache_miss, self.__cache_evict)
else:
self.__cache = None
def name(self):
full_path = os.path.join(self.__directory, self.__db)
return full_path
def __cache_miss(self, key):
b_item = self._db.get(self.serializer.pack(key))
if b_item is not None:
return self.serializer.unpack(b_item)
else:
raise KeyError(key)
def __cache_evict(self, key, val):
self._db.put(self.serializer.pack(key), self.serializer.pack(val))
def __copy__(self):
return type(self)(self.__directory, self._db, type(self.serializer), type(self.__cache), db=self.__db)
def __getitem__(self, key):
return self.__cache[key]
def __setitem__(self, key, value):
if key in self.__cache:
self.__cache[key] = value
self.__write_to_db(key, value)
def __write_to_db(self, key, value):
self._db.put(self.serializer.pack(key), self.serializer.pack(value))
def __delitem__(self, key):
if key in self.__cache:
del self.__cache[key]
self._db.delete(self.serializer.pack(key))
def __iter__(self):
return self.iterkeys()
def __keytransform__(self, key):
return key
def __len__(self):
return self.count()
def __del__(self):
self.flush()
if not self._db.closed:
self._db.close()
# improved methods
def flush(self, write_to_db=False):
if self.__cache:
if write_to_db:
batch = self.set_batch()
for key, val in self.__cache.items():
batch.set(key, val)
batch.commit()
self.__cache.clear()
def set_batch(self):
return PlyvelBatchWrite(self)
def iteritems(self):
self.flush()
for key, value in self._db.iterator(include_key=True, include_value=True):
yield (self.serializer.unpack(key), self.serializer.unpack(value))
def iterkeys(self):
self.flush()
for key in self._db.iterator(include_key=True, include_value=False):
yield self.serializer.unpack(key)
def itervalues(self):
self.flush()
for val in self._db.iterator(include_key=False, include_value=True):
yield self.serializer.unpack(val)
def keys(self):
self.flush()
# fixes default method which calls __len__
return list(self.iterkeys())
def values(self):
self.flush()
return list(self.itervalues())
def has_key(self, key):
return key in self
def clear(self):
self.flush()
for k in self:
del self[k]
def count(self):
self.flush()
return sum(1 for key in self)
和图 class:
class PersistedGraph(nx.Graph):
def __init__(self, data=None, node_dict_factory=None, adjlist_dict_factory=None, edge_attr_dict_factory=None,
**attr):
if node_dict_factory:
self.node_dict_factory = node_dict_factory
if adjlist_dict_factory:
self.adjlist_dict_factory = adjlist_dict_factory
if edge_attr_dict_factory:
self.edge_attr_dict_factory = edge_attr_dict_factory
nx.Graph.__init__(self, data, **attr)
NetworkX 是否支持自定义节点、边和属性的存储位置?例如,我想尝试 2 个选项:
使用 LevelDB / Kyoto Cabinet 之类的东西作为后备存储。
使用一些分布式数据库(Neo4j 甚至 HBase - 我只需要 nodes/edges 的分布式存储)作为后备存储。
NetworkX 支持这些东西的扩展点是什么?
应该可以通过子classing Graph class 并提供用户定义的工厂函数来扩展 networkx。 这些函数可以查询数据库并将结果缓存在 networkx 使用的字典中。
我无法从联机文档中找到这些行,但在 code 中您有:
Subclasses(高级):
Graph class 使用 dict-of-dict-of-dict 数据结构。
外部字典 (node_dict) 包含以节点为键的邻接列表。 接下来的dict(adjlist)表示邻接表,持有 由邻居键控的边缘数据。内部字典 (edge_attr) 表示 边缘数据并保存由属性名称键入的边缘属性值。
这三个字典中的每一个都可以由用户定义的替换 类似字典的对象。一般来说,dict-like 的特征应该是 维护,但可以添加额外的功能。更换其中之一 dicts 通过更改 class(!) 变量创建一个新图 class 为类似 dict 的结构持有工厂。变量名称 是 node_dict_factory、adjlist_dict_factory 和 edge_attr_dict_factory。
node_dict_factory : function, (default: dict)
Factory function to be used to create the outer-most dict
in the data structure that holds adjacency lists keyed by node.
It should require no arguments and return a dict-like object.
adjlist_dict_factory : function, (default: dict)
Factory function to be used to create the adjacency list
dict which holds edge data keyed by neighbor.
It should require no arguments and return a dict-like object
edge_attr_dict_factory : function, (default: dict)
Factory function to be used to create the edge attribute
dict which holds attrbute values keyed by attribute name.
It should require no arguments and return a dict-like object.
我不知道 networkx 有任何官方扩展。
我将 post 为外部存储设置 NetworkX 的细微差别。 Kikohs 正确地指出每个词典都有工厂。这些可以被覆盖。
对于持久存储,唯一真正需要特别注意的字典是节点字典。
必须特别注意类字典实现的行为方式。 NetworkX classes 中有一些代码可以更改从内存中的字典返回的值,而不会将它们重新设置。
例如:
self.succ[u][v]=datadict
self.pred[v][u]=datadict
这些值不会持久保存回存储后端。为了适应这一点,我使用了一个内存缓存,将对象保存在内存中,当它们被逐出时,它会将它们写入底层存储。
对于我使用的内存缓存 cachetools
。有关驱逐,请参阅:
对于底层存储,我使用了 plyvel
(https://plyvel.readthedocs.org/en/latest/),它是 LevelDB 的 Python 接口。
下面我也给出了字典的实现。请注意,代码中仍然存在错误和错误,并且还没有经过适当的测试,但您已经了解了大致的思路。
class PlyvelBatchWrite(object):
def __init__(self, plv_dict):
self.__batch = plv_dict._db.write_batch()
self.__plvd = plv_dict
def set(self, key, val):
self.__batch.put(self.__plvd.serializer.pack(key), self.__plvd.serializer.pack(val))
def delete(self, key):
self.__batch.delete(self.__plvd.serializer.pack(key))
def clear(self):
self.__batch.clear()
def commit(self):
self.__batch.write()
class PlyvelDict(MutableMapping):
def __init__(self, directory='', db=None, serializer_factory=None, cache_factory=None, **kwargs):
self.__directory = directory
ensure_directory(directory)
if isinstance(db, str) or db is None:
if db is None:
# generate UUID
db = str(uuid.uuid4())
self.__db = db
db = plyvel.DB(self.name(), **kwargs)
else:
self.__db = kwargs['db']
self._db = db
if serializer_factory:
self.serializer = serializer_factory()
else:
self.serializer = None
if cache_factory:
self.__cache = cache_factory(self.__cache_miss, self.__cache_evict)
else:
self.__cache = None
def name(self):
full_path = os.path.join(self.__directory, self.__db)
return full_path
def __cache_miss(self, key):
b_item = self._db.get(self.serializer.pack(key))
if b_item is not None:
return self.serializer.unpack(b_item)
else:
raise KeyError(key)
def __cache_evict(self, key, val):
self._db.put(self.serializer.pack(key), self.serializer.pack(val))
def __copy__(self):
return type(self)(self.__directory, self._db, type(self.serializer), type(self.__cache), db=self.__db)
def __getitem__(self, key):
return self.__cache[key]
def __setitem__(self, key, value):
if key in self.__cache:
self.__cache[key] = value
self.__write_to_db(key, value)
def __write_to_db(self, key, value):
self._db.put(self.serializer.pack(key), self.serializer.pack(value))
def __delitem__(self, key):
if key in self.__cache:
del self.__cache[key]
self._db.delete(self.serializer.pack(key))
def __iter__(self):
return self.iterkeys()
def __keytransform__(self, key):
return key
def __len__(self):
return self.count()
def __del__(self):
self.flush()
if not self._db.closed:
self._db.close()
# improved methods
def flush(self, write_to_db=False):
if self.__cache:
if write_to_db:
batch = self.set_batch()
for key, val in self.__cache.items():
batch.set(key, val)
batch.commit()
self.__cache.clear()
def set_batch(self):
return PlyvelBatchWrite(self)
def iteritems(self):
self.flush()
for key, value in self._db.iterator(include_key=True, include_value=True):
yield (self.serializer.unpack(key), self.serializer.unpack(value))
def iterkeys(self):
self.flush()
for key in self._db.iterator(include_key=True, include_value=False):
yield self.serializer.unpack(key)
def itervalues(self):
self.flush()
for val in self._db.iterator(include_key=False, include_value=True):
yield self.serializer.unpack(val)
def keys(self):
self.flush()
# fixes default method which calls __len__
return list(self.iterkeys())
def values(self):
self.flush()
return list(self.itervalues())
def has_key(self, key):
return key in self
def clear(self):
self.flush()
for k in self:
del self[k]
def count(self):
self.flush()
return sum(1 for key in self)
和图 class:
class PersistedGraph(nx.Graph):
def __init__(self, data=None, node_dict_factory=None, adjlist_dict_factory=None, edge_attr_dict_factory=None,
**attr):
if node_dict_factory:
self.node_dict_factory = node_dict_factory
if adjlist_dict_factory:
self.adjlist_dict_factory = adjlist_dict_factory
if edge_attr_dict_factory:
self.edge_attr_dict_factory = edge_attr_dict_factory
nx.Graph.__init__(self, data, **attr)