Python 重试打开文件

Python retry to open the file

尝试打开文件并在 n 秒后重试的最佳做法是什么?

目前,我是:

import os
from os import path
import shutil

dir_path = path.abspath(path.join("foo", "bar"))
destination_path = path.abspath(path.join("foo", "dest_dir"))

for f in dir_path:
    try:
        # try to open the file if the file isn't used any longer
        file_opened = open(f, 'r')
        # move the file on successful opening
        shutil.move(file_opened, destination_path)
        file_opened.close()
    except IOError:
        return False

所以,目前我不处理异常。我考虑创建额外的函数来打开文件并使用 time.sleep(n)

调用异常函数

不过,我敢肯定肯定还有其他事情......

我不使用

with open(f, 'rb') as file_opened: 
    do whatever` 

编辑:

一个进程创建文件,我希望Python进程在确定文件写入/创建完成后移动文件。所以,我在上面的代码中添加了 shutil.move 来显示整个情况。

编辑:

请在下面找到我为解决问题而开发的代码。我最后编写了自己的自定义解决方案来处理它:

import os
from os import path
import psutil
from retry import retry
import shutil
from subprocess import check_output,Popen, PIPE
import glob
import time


class FileHandler:
    def __init__(self, fn_source, dir_source):
        self.file_source_name = fn_source
        self.file_source_path = path.join(dir_source, self.file_source_name)
        self.dir_dest_path = path.join(dir_source, "test")
        self.file_dest_path = path.join(self.dir_dest_path, self.file_source_name)

    def check_file(self):
        if os.path.exists(self.file_source_path):
            try:
                os.rename(self.file_source_path, self.file_source_path)
                print("file renamed")
                return True
            except:
                print("can not rename the file..retrying")
                time.sleep(1)
                self.check_file()
        else:
            print("source file does not exist...retrying")
            time.sleep(5)
            self.check_file()

    def check_destination(self):
        if os.path.exists(self.file_source_path) and not os.path.exists(self.file_dest_path):
            return True
        elif os.path.exists(self.file_source_path) and os.path.exists(self.file_dest_path):
            try:
                print(self.file_dest_path, self.file_source_name)
                os.remove(self.file_dest_path)
                return True
            except Exception as e:
                print("can not remove the file..retrying")
                time.sleep(5)
                self.check_destination()

    def move_file(self):
        if self.check_destination():
            print(self.file_source_path)
            shutil.move(self.file_source_path, self.file_dest_path)
            print("moved", str(self.file_source_path))
            return True
        else:
            print("can not move the file..retrying")
            time.sleep(1)
            self.move_file()

    def file_ops(self):
        if self.check_file():
            self.move_file()
        else:
            print("source does not exist")
            time.sleep(1)
            self.file_ops()
        return True


def each_file_ops(fn, dir_source):
    fh = FileHandler(fn, dir_source)
    return fh.file_ops()


def main(dir_source):
    dir_files = glob.glob(path.join(dir_source, '*.txt'))
    if dir_files:
        [each_file_ops(f, dir_source) for f in dir_files]
    else:
        print("source dir is empty")
        time.sleep(1)
        main(dir_source)


if __name__ == '__main__':
    main(path.join(""))

您无需打开文件即可移动它。因为一旦你打开一个文件,它就处于打开状态,因此你不能移动它,就这样吧,当你在播放器中播放视频时,你试图删除它或剪切和粘贴系统不允许你。

import shutil

file_name = 'trytry.csv'

shutil.copyfile(file_name, '/Users/umeshkaushik/PycharmProjects/newtry.csv')
shutil.move(file_name, '/Users/umeshkaushik/PycharmProjects/newtry1.csv')

以上代码运行良好。只要确保您提供正确的输入和输出路径即可。

您可以使用 retry 模块进行此类重试。这使代码看起来更清晰。 pip install retry 应该安装模块

from retry import retry
import shutil

@retry((FileNotFoundError, IOError), delay=1, backoff=2, max_delay=10, tries=100)
def attempt_to_move_file(fname, dest_path):
    # Your attempt to move file
    # shutil.move(fname, destination_path)

使用上面的代码,当调用 attempt_to_move_file 时,只要我们点击 FileNotFoundErrorIOError 就会重试(最多尝试 100 次)并且重试发生在睡眠 1、2、4、8、10、10、10 ... 尝试之间的秒数