Python 重试打开文件
Python retry to open the file
尝试打开文件并在 n 秒后重试的最佳做法是什么?
目前,我是:
import os
from os import path
import shutil
dir_path = path.abspath(path.join("foo", "bar"))
destination_path = path.abspath(path.join("foo", "dest_dir"))
for f in dir_path:
try:
# try to open the file if the file isn't used any longer
file_opened = open(f, 'r')
# move the file on successful opening
shutil.move(file_opened, destination_path)
file_opened.close()
except IOError:
return False
所以,目前我不处理异常。我考虑创建额外的函数来打开文件并使用 time.sleep(n)
调用异常函数
不过,我敢肯定肯定还有其他事情......
我不使用
with open(f, 'rb') as file_opened:
do whatever`
编辑:
一个进程创建文件,我希望Python进程在确定文件写入/创建完成后移动文件。所以,我在上面的代码中添加了 shutil.move 来显示整个情况。
编辑:
请在下面找到我为解决问题而开发的代码。我最后编写了自己的自定义解决方案来处理它:
import os
from os import path
import psutil
from retry import retry
import shutil
from subprocess import check_output,Popen, PIPE
import glob
import time
class FileHandler:
def __init__(self, fn_source, dir_source):
self.file_source_name = fn_source
self.file_source_path = path.join(dir_source, self.file_source_name)
self.dir_dest_path = path.join(dir_source, "test")
self.file_dest_path = path.join(self.dir_dest_path, self.file_source_name)
def check_file(self):
if os.path.exists(self.file_source_path):
try:
os.rename(self.file_source_path, self.file_source_path)
print("file renamed")
return True
except:
print("can not rename the file..retrying")
time.sleep(1)
self.check_file()
else:
print("source file does not exist...retrying")
time.sleep(5)
self.check_file()
def check_destination(self):
if os.path.exists(self.file_source_path) and not os.path.exists(self.file_dest_path):
return True
elif os.path.exists(self.file_source_path) and os.path.exists(self.file_dest_path):
try:
print(self.file_dest_path, self.file_source_name)
os.remove(self.file_dest_path)
return True
except Exception as e:
print("can not remove the file..retrying")
time.sleep(5)
self.check_destination()
def move_file(self):
if self.check_destination():
print(self.file_source_path)
shutil.move(self.file_source_path, self.file_dest_path)
print("moved", str(self.file_source_path))
return True
else:
print("can not move the file..retrying")
time.sleep(1)
self.move_file()
def file_ops(self):
if self.check_file():
self.move_file()
else:
print("source does not exist")
time.sleep(1)
self.file_ops()
return True
def each_file_ops(fn, dir_source):
fh = FileHandler(fn, dir_source)
return fh.file_ops()
def main(dir_source):
dir_files = glob.glob(path.join(dir_source, '*.txt'))
if dir_files:
[each_file_ops(f, dir_source) for f in dir_files]
else:
print("source dir is empty")
time.sleep(1)
main(dir_source)
if __name__ == '__main__':
main(path.join(""))
您无需打开文件即可移动它。因为一旦你打开一个文件,它就处于打开状态,因此你不能移动它,就这样吧,当你在播放器中播放视频时,你试图删除它或剪切和粘贴系统不允许你。
import shutil
file_name = 'trytry.csv'
shutil.copyfile(file_name, '/Users/umeshkaushik/PycharmProjects/newtry.csv')
shutil.move(file_name, '/Users/umeshkaushik/PycharmProjects/newtry1.csv')
以上代码运行良好。只要确保您提供正确的输入和输出路径即可。
您可以使用 retry
模块进行此类重试。这使代码看起来更清晰。 pip install retry
应该安装模块
from retry import retry
import shutil
@retry((FileNotFoundError, IOError), delay=1, backoff=2, max_delay=10, tries=100)
def attempt_to_move_file(fname, dest_path):
# Your attempt to move file
# shutil.move(fname, destination_path)
使用上面的代码,当调用 attempt_to_move_file
时,只要我们点击 FileNotFoundError
或 IOError
就会重试(最多尝试 100 次)并且重试发生在睡眠 1、2、4、8、10、10、10 ... 尝试之间的秒数
尝试打开文件并在 n 秒后重试的最佳做法是什么?
目前,我是:
import os
from os import path
import shutil
dir_path = path.abspath(path.join("foo", "bar"))
destination_path = path.abspath(path.join("foo", "dest_dir"))
for f in dir_path:
try:
# try to open the file if the file isn't used any longer
file_opened = open(f, 'r')
# move the file on successful opening
shutil.move(file_opened, destination_path)
file_opened.close()
except IOError:
return False
所以,目前我不处理异常。我考虑创建额外的函数来打开文件并使用 time.sleep(n)
调用异常函数不过,我敢肯定肯定还有其他事情......
我不使用
with open(f, 'rb') as file_opened:
do whatever`
编辑:
一个进程创建文件,我希望Python进程在确定文件写入/创建完成后移动文件。所以,我在上面的代码中添加了 shutil.move 来显示整个情况。
编辑:
请在下面找到我为解决问题而开发的代码。我最后编写了自己的自定义解决方案来处理它:
import os
from os import path
import psutil
from retry import retry
import shutil
from subprocess import check_output,Popen, PIPE
import glob
import time
class FileHandler:
def __init__(self, fn_source, dir_source):
self.file_source_name = fn_source
self.file_source_path = path.join(dir_source, self.file_source_name)
self.dir_dest_path = path.join(dir_source, "test")
self.file_dest_path = path.join(self.dir_dest_path, self.file_source_name)
def check_file(self):
if os.path.exists(self.file_source_path):
try:
os.rename(self.file_source_path, self.file_source_path)
print("file renamed")
return True
except:
print("can not rename the file..retrying")
time.sleep(1)
self.check_file()
else:
print("source file does not exist...retrying")
time.sleep(5)
self.check_file()
def check_destination(self):
if os.path.exists(self.file_source_path) and not os.path.exists(self.file_dest_path):
return True
elif os.path.exists(self.file_source_path) and os.path.exists(self.file_dest_path):
try:
print(self.file_dest_path, self.file_source_name)
os.remove(self.file_dest_path)
return True
except Exception as e:
print("can not remove the file..retrying")
time.sleep(5)
self.check_destination()
def move_file(self):
if self.check_destination():
print(self.file_source_path)
shutil.move(self.file_source_path, self.file_dest_path)
print("moved", str(self.file_source_path))
return True
else:
print("can not move the file..retrying")
time.sleep(1)
self.move_file()
def file_ops(self):
if self.check_file():
self.move_file()
else:
print("source does not exist")
time.sleep(1)
self.file_ops()
return True
def each_file_ops(fn, dir_source):
fh = FileHandler(fn, dir_source)
return fh.file_ops()
def main(dir_source):
dir_files = glob.glob(path.join(dir_source, '*.txt'))
if dir_files:
[each_file_ops(f, dir_source) for f in dir_files]
else:
print("source dir is empty")
time.sleep(1)
main(dir_source)
if __name__ == '__main__':
main(path.join(""))
您无需打开文件即可移动它。因为一旦你打开一个文件,它就处于打开状态,因此你不能移动它,就这样吧,当你在播放器中播放视频时,你试图删除它或剪切和粘贴系统不允许你。
import shutil
file_name = 'trytry.csv'
shutil.copyfile(file_name, '/Users/umeshkaushik/PycharmProjects/newtry.csv')
shutil.move(file_name, '/Users/umeshkaushik/PycharmProjects/newtry1.csv')
以上代码运行良好。只要确保您提供正确的输入和输出路径即可。
您可以使用 retry
模块进行此类重试。这使代码看起来更清晰。 pip install retry
应该安装模块
from retry import retry
import shutil
@retry((FileNotFoundError, IOError), delay=1, backoff=2, max_delay=10, tries=100)
def attempt_to_move_file(fname, dest_path):
# Your attempt to move file
# shutil.move(fname, destination_path)
使用上面的代码,当调用 attempt_to_move_file
时,只要我们点击 FileNotFoundError
或 IOError
就会重试(最多尝试 100 次)并且重试发生在睡眠 1、2、4、8、10、10、10 ... 尝试之间的秒数