Load and dump to file in multithreading using Pickle and filelock - IOError: [Errno 13]
Load and dump to file in multithreading using Pickle and filelock - IOError: [Errno 13]
我有一项服务可以使用 python 2.7 [=12= 将数据从 python dict
加载和转储到文件中].此服务可被多人同时调用
什么方法允许 cPickle
在多线程上下文中读取数据并将其转储到单个文件中,以避免操作期间数据的去同步化(在另一个进程正在转储时加载)问题?
我正在考虑使用 filelock,但我还没有成功。
使用下面的代码,文件在 init_cache()
或 update_cache()
中总是有 cPickle.load(cache_file)IOError: [Errno 13] Permission denied"
错误
''' example of a dict dumped by pickle
{
"version": "1499180895",
"queries": {
"001::id,name,age" : "aBase64EncodedString==",
"002::id,name,sex" : "anotherBase64EncodedString=="
}
}
'''
import cPickle as pickle
import filelock
from os import path
self.cache_file_path = "\\serverDisk\cache\cache.pkl"
self.select_by_values = "001"
self.out_fields = ["id", "name", "age"]
def get_from_cache_fn(self):
try:
server_version = self.query_version()
query_id = "{}::{}".format(self.select_by_values, ",".join(self.out_fields))
if path.isfile(self.cache_file_path):
cache_dict = self.load_cache(server_version, query_id)
if cache_dict["version"] == server_version:
if query_id in cache_dict["queries"]:
return cache_dict["queries"][query_id]
else:
return self.update_cache(cache_dict, query_id)["queries"][query_id]
else:
return self.init_cache(server_version, query_id)["queries"][query_id]
else:
return self.init_cache(server_version, query_id)["queries"][query_id]
except Exception:
self.add_service_error(ERRORS["get_from_cache"][0], traceback.format_exc())
def load_cache(self, server_version, query_id):
with open(self.cache_file_path, "rb") as cache_file:
try:
cache_dict = pickle.load(cache_file)
return cache_dict
except StandardError:
return self.init_cache(server_version, query_id)
def init_cache(self, server_version, query_id):
cache_dict = {
"version" : server_version,
"queries" : {
query_id : base64.b64encode(zlib.compress(json.dumps(self.query_features())))
}
}
lock = filelock.FileLock(self.cache_file_path)
try:
with lock.acquire(timeout=10):
with open(self.cache_file_path, "wb") as cache_file:
pickle.dump(cache_dict, cache_file)
return cache_dict
except lock.Timeout:
self.add_service_error("init_cache timeout", traceback.format_exc())
def update_cache(self, cache_dict, query_id):
cache_dict["queries"][query_id] = base64.b64encode(zlib.compress(json.dumps(self.query_features())))
lock = filelock.FileLock(self.cache_file_path)
try:
with lock.acquire(timeout = 10):
with open(self.cache_file_path, "wb") as cache_file:
pickle.dump(cache_dict, cache_file)
return cache_dict
except lock.Timeout:
self.add_service_error("update_cache timeout", traceback.format_exc())
根据 Filelock 文档,您应该将 lock.acquire 包裹在 try{}except{}
中。否则,当您的获取超时时,它可能会使您的应用程序因未处理的异常而崩溃。参见 https://pypi.python.org/pypi/filelock
我找到了解决问题的方法。
看来您必须提供与您打开的文件不同的锁名称。
lock = filelock.FileLock("{}.lock".format(self.cache_file_path))
而不是 lock = filelock.FileLock(self.cache_file_path)
例如:
def update_cache(self, cache_dict, query_id):
cache_dict["queries"][query_id] = base64.b64encode(zlib.compress(json.dumps(self.query_features())))
lock = lock = filelock.FileLock("{}.lock".format(self.cache_file_path))
try:
with lock.acquire(timeout = 10):
with open(self.cache_file_path, "wb") as cache_file:
pickle.dump(cache_dict, cache_file)
return cache_dict
except lock.Timeout:
self.add_service_error("update_cache timeout", traceback.format_exc())
我有一项服务可以使用 python 2.7 [=12= 将数据从 python dict
加载和转储到文件中].此服务可被多人同时调用
什么方法允许 cPickle
在多线程上下文中读取数据并将其转储到单个文件中,以避免操作期间数据的去同步化(在另一个进程正在转储时加载)问题?
我正在考虑使用 filelock,但我还没有成功。
使用下面的代码,文件在 init_cache()
或 update_cache()
cPickle.load(cache_file)IOError: [Errno 13] Permission denied"
错误
''' example of a dict dumped by pickle
{
"version": "1499180895",
"queries": {
"001::id,name,age" : "aBase64EncodedString==",
"002::id,name,sex" : "anotherBase64EncodedString=="
}
}
'''
import cPickle as pickle
import filelock
from os import path
self.cache_file_path = "\\serverDisk\cache\cache.pkl"
self.select_by_values = "001"
self.out_fields = ["id", "name", "age"]
def get_from_cache_fn(self):
try:
server_version = self.query_version()
query_id = "{}::{}".format(self.select_by_values, ",".join(self.out_fields))
if path.isfile(self.cache_file_path):
cache_dict = self.load_cache(server_version, query_id)
if cache_dict["version"] == server_version:
if query_id in cache_dict["queries"]:
return cache_dict["queries"][query_id]
else:
return self.update_cache(cache_dict, query_id)["queries"][query_id]
else:
return self.init_cache(server_version, query_id)["queries"][query_id]
else:
return self.init_cache(server_version, query_id)["queries"][query_id]
except Exception:
self.add_service_error(ERRORS["get_from_cache"][0], traceback.format_exc())
def load_cache(self, server_version, query_id):
with open(self.cache_file_path, "rb") as cache_file:
try:
cache_dict = pickle.load(cache_file)
return cache_dict
except StandardError:
return self.init_cache(server_version, query_id)
def init_cache(self, server_version, query_id):
cache_dict = {
"version" : server_version,
"queries" : {
query_id : base64.b64encode(zlib.compress(json.dumps(self.query_features())))
}
}
lock = filelock.FileLock(self.cache_file_path)
try:
with lock.acquire(timeout=10):
with open(self.cache_file_path, "wb") as cache_file:
pickle.dump(cache_dict, cache_file)
return cache_dict
except lock.Timeout:
self.add_service_error("init_cache timeout", traceback.format_exc())
def update_cache(self, cache_dict, query_id):
cache_dict["queries"][query_id] = base64.b64encode(zlib.compress(json.dumps(self.query_features())))
lock = filelock.FileLock(self.cache_file_path)
try:
with lock.acquire(timeout = 10):
with open(self.cache_file_path, "wb") as cache_file:
pickle.dump(cache_dict, cache_file)
return cache_dict
except lock.Timeout:
self.add_service_error("update_cache timeout", traceback.format_exc())
根据 Filelock 文档,您应该将 lock.acquire 包裹在 try{}except{}
中。否则,当您的获取超时时,它可能会使您的应用程序因未处理的异常而崩溃。参见 https://pypi.python.org/pypi/filelock
我找到了解决问题的方法。
看来您必须提供与您打开的文件不同的锁名称。
lock = filelock.FileLock("{}.lock".format(self.cache_file_path))
而不是 lock = filelock.FileLock(self.cache_file_path)
例如:
def update_cache(self, cache_dict, query_id):
cache_dict["queries"][query_id] = base64.b64encode(zlib.compress(json.dumps(self.query_features())))
lock = lock = filelock.FileLock("{}.lock".format(self.cache_file_path))
try:
with lock.acquire(timeout = 10):
with open(self.cache_file_path, "wb") as cache_file:
pickle.dump(cache_dict, cache_file)
return cache_dict
except lock.Timeout:
self.add_service_error("update_cache timeout", traceback.format_exc())