Load and dump to file in multithreading using Pickle and filelock - IOError: [Errno 13]

Load and dump to file in multithreading using Pickle and filelock - IOError: [Errno 13]

我有一项服务可以使用 python 2.7 [=12= 将数据从 python dict 加载和转储到文件中].此服务可被多人同时调用

什么方法允许 cPickle 在多线程上下文中读取数据并将其转储到单个文件中,以避免操作期间数据的去同步化(在另一个进程正在转储时加载)问题?

我正在考虑使用 filelock,但我还没有成功。

使用下面的代码,文件在 init_cache()update_cache()

中总是有 cPickle.load(cache_file)IOError: [Errno 13] Permission denied" 错误
''' example of a dict dumped by pickle

  { 
     "version": "1499180895", 
     "queries": { 
         "001::id,name,age" : "aBase64EncodedString==",
         "002::id,name,sex" : "anotherBase64EncodedString=="
      }
   }

'''


import cPickle as pickle
import filelock
from os import path

self.cache_file_path = "\\serverDisk\cache\cache.pkl"
self.select_by_values = "001"
self.out_fields = ["id", "name", "age"]

def get_from_cache_fn(self):
    try:
        server_version = self.query_version()
        query_id = "{}::{}".format(self.select_by_values, ",".join(self.out_fields))
        if path.isfile(self.cache_file_path):
            cache_dict = self.load_cache(server_version, query_id)
            if cache_dict["version"] == server_version:
                if query_id in cache_dict["queries"]:
                     return cache_dict["queries"][query_id]
                else:
                    return self.update_cache(cache_dict, query_id)["queries"][query_id]
            else:
                return self.init_cache(server_version, query_id)["queries"][query_id]
        else:
            return self.init_cache(server_version, query_id)["queries"][query_id]
    except Exception:
        self.add_service_error(ERRORS["get_from_cache"][0], traceback.format_exc())


def load_cache(self, server_version, query_id):
    with open(self.cache_file_path, "rb") as cache_file:
        try:
            cache_dict = pickle.load(cache_file)
            return cache_dict
        except StandardError:
            return self.init_cache(server_version, query_id)


def init_cache(self, server_version, query_id):
    cache_dict = {
        "version" : server_version,
        "queries" : {
            query_id : base64.b64encode(zlib.compress(json.dumps(self.query_features())))
        }
    }
    lock = filelock.FileLock(self.cache_file_path)
    try:
        with lock.acquire(timeout=10):
            with open(self.cache_file_path, "wb") as cache_file:
                pickle.dump(cache_dict, cache_file)
                return cache_dict
    except lock.Timeout:
        self.add_service_error("init_cache timeout", traceback.format_exc())


def update_cache(self, cache_dict, query_id):
    cache_dict["queries"][query_id] = base64.b64encode(zlib.compress(json.dumps(self.query_features())))
    lock = filelock.FileLock(self.cache_file_path)
    try:
        with lock.acquire(timeout = 10):
            with open(self.cache_file_path, "wb") as cache_file:
                pickle.dump(cache_dict, cache_file)
                return cache_dict
    except lock.Timeout:
        self.add_service_error("update_cache timeout", traceback.format_exc())

根据 Filelock 文档,您应该将 lock.acquire 包裹在 try{}except{} 中。否则,当您的获取超时时,它可能会使您的应用程序因未处理的异常而崩溃。参见 https://pypi.python.org/pypi/filelock

我找到了解决问题的方法。

看来您必须提供与您打开的文件不同的锁名称。

lock = filelock.FileLock("{}.lock".format(self.cache_file_path)) 而不是 lock = filelock.FileLock(self.cache_file_path)

例如:

def update_cache(self, cache_dict, query_id):
    cache_dict["queries"][query_id] = base64.b64encode(zlib.compress(json.dumps(self.query_features())))
    lock = lock = filelock.FileLock("{}.lock".format(self.cache_file_path))
    try:
        with lock.acquire(timeout = 10):
            with open(self.cache_file_path, "wb") as cache_file:
                pickle.dump(cache_dict, cache_file)
                return cache_dict
    except lock.Timeout:
        self.add_service_error("update_cache timeout", traceback.format_exc())