Python 脚本不会 运行 针对所有文件,并且不会在我按 Ctrl-C 时结束
Python Script won't run against all files, and won't end when I hit Ctrl-C
我是 Python 的新手,我在使用 Watchdog 执行某些代码时遇到了问题。该代码应该在文件被修改或创建时将文件复制到它们各自的文件夹中。它将对一个文件起作用,但如果有更多文件匹配,它就会退出。出于某种原因,我也无法使用 Ctrl-C 停止程序。完整代码如下:
import os
import os.path
import shutil
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from watchdog.events import PatternMatchingEventHandler
sourcepath='C:/Users/bhart/Downloads/'
sourcefiles = os.listdir(sourcepath)
destinationpath = 'C:/Users/bhart/Downloads/xls'
destinationpathcsv = 'C:/Users/bhart/Downloads/csv'
destinationpathtxt = 'C:/Users/bhart/Downloads/txt'
destinationpathpdf = 'C:/Users/bhart/Downloads/pdf'
path = sourcepath
event_handler = FileSystemEventHandler()
def on_created(event):
for file in sourcefiles:
if os.path.exists(file):
if file.endswith('.xls') or file.endswith('.xlsx'):
shutil.move(os.path.join(sourcepath,file), os.path.join(destinationpath,file))
if file.endswith('.csv'):
shutil.move(os.path.join(sourcepath,file), os.path.join(destinationpathcsv,file))
print("CSV file moved.")
if file.endswith('.txt'):
print("TXT file moved")
shutil.move(os.path.join(sourcepath,file), os.path.join(destinationpathtxt,file))
if file.endswith('.pdf'):
shutil.move(os.path.join(sourcepath,file), os.path.join(destinationpathpdf,file))
def on_modified(event):
for file in sourcefiles:
if os.path.exists(file):
if file.endswith('.xls') or file.endswith('.xlsx'):
shutil.move(os.path.join(sourcepath,file), os.path.join(destinationpath,file))
if file.endswith('.csv'):
shutil.move(os.path.join(sourcepath,file), os.path.join(destinationpathcsv,file))
if file.endswith('.txt'):
print("TXT file moved")
shutil.move(os.path.join(sourcepath,file), os.path.join(destinationpathtxt,file))
if file.endswith('.pdf'):
shutil.move(os.path.join(sourcepath,file), os.path.join(destinationpathpdf,file))
if __name__ == "__main__":
event_handler.on_modified = on_modified
observer = Observer()
observer.start()
observer.schedule(event_handler, path, recursive=True)
observer.join()
event_handler.on_created = on_created
observer = Observer()
observer.start()
observer.schedule(event_handler, path, recursive=True)
observer.join()
try:
print("test")
except KeyboardInterrupt:
exit()
我不知道我是否解决了所有问题但是:
listdir()
给出没有目录的文件名,你必须使用 os.path.join()
即使你检查 os.path.exists()
if os.path.exists( os.path.join(sourcepath, file) ):
listdir()
只给出一次文件名,你必须在 for
-loop 中使用它来获取新文件名。
def on_created(event):
sourcefilenames = os.listdir(sourcepath)
for filename in sourcefilenames:
src = os.path.join(sourcepath, filename)
if os.path.exists(src):
# ... code ...
def on_modified(event):
sourcefilenames = os.listdir(sourcepath)
for filename in sourcefilenames:
src = os.path.join(sourcepath, filename)
if os.path.exists(src):
# ... code ...
.join()
阻塞代码并等到你关闭程序,所以它创建第一个 Observer
并等待它结束,然后再创建第二个 Observer
- 但你可以用一个 Observer
您在 on_created
和 on_modified
中的代码似乎相同,因此您可以在两种情况下使用一个函数
def move_it(event):
sourcefilenames = os.listdir(sourcepath)
for filename in sourcefilenames:
src = os.path.join(sourcepath, filename)
if os.path.exists(src):
# ... code ...
if __name__ == "__main__":
event_handler = FileSystemEventHandler()
event_handler.on_modified = move_it
event_handler.on_created = move_it
observer = Observer()
observer.start()
observer.schedule(event_handler, sourcepath, recursive=True)
observer.join()
如果你想捕获 Ctrl+C
那么你应该将所有代码放在 try/except
中(或者至少将 join()
放在 try/except
中)。
我不知道你对 Ctrl+C
有什么问题,但它适用于 Linux。
if __name__ == "__main__":
try:
event_handler = FileSystemEventHandler()
event_handler.on_modified = move_it
event_handler.on_created = move_it
observer = Observer()
observer.start()
observer.schedule(event_handler, sourcepath, recursive=True)
observer.join()
except KeyboardInterrupt:
print('Stopped by Ctrl+C')
一条建议:
如果使用字典,代码会更简单、更通用
{
".xls": "C:/.../xls",
".xlsx": "C:/.../xls",
# ...
}
这样您就可以使用 for
-loop 来检查所有扩展。而且您始终可以在不更改函数代码的情况下向字典添加新的扩展。
import os
import shutil
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
sourcepath = 'C:/Users/bhart/Downloads/'
destinationpath = {
'.xls' : 'C:/Users/bhart/Downloads/xls',
'.xlsx': 'C:/Users/bhart/Downloads/xls',
'.csv' : 'C:/Users/bhart/Downloads/csv',
'.txt' : 'C:/Users/bhart/Downloads/txt',
'.pdf' : 'C:/Users/bhart/Downloads/pdf',
}
def move_it(event):
sourcefilenames = os.listdir(sourcepath)
for filename in sourcefilenames:
src = os.path.join(sourcepath, filename)
if os.path.exists(src):
for ext, dst in destinationpath.items():
if filename.lower().endswith(ext):
print('move:', filename, '->', dst)
shutil.move(src, os.path.join(dst, filename))
if __name__ == "__main__":
try:
event_handler = FileSystemEventHandler()
event_handler.on_modified = move_it
event_handler.on_created = move_it
observer = Observer()
observer.start()
observer.schedule(event_handler, sourcepath, recursive=True)
observer.join()
except KeyboardInterrupt:
print('Stopped by Ctrl+C')
编辑:
event
给出 event.src_path
、event.event_type
等。您可以使用它代替 listdir()
来获取文件路径。
import os
import shutil
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
sourcepath = 'C:/Users/bhart/Downloads/'
destinationpath = {
'.xls' : 'C:/Users/bhart/Downloads/xls',
'.xlsx': 'C:/Users/bhart/Downloads/xls',
'.csv' : 'C:/Users/bhart/Downloads/csv',
'.txt' : 'C:/Users/bhart/Downloads/txt',
'.pdf' : 'C:/Users/bhart/Downloads/pdf',
}
def move_it(event):
#print(dir(event))
#print('event:', event)
#print('event_type:', event.event_type)
#print('is_directory:', event.is_directory)
#print('src_path:', event.src_path)
#print('key:', event.key)
#print('----')
if not event.is_directory:
parts = os.path.split(event.src_path)
#print('parts:', parts)
filename = parts[-1]
for ext, dst in destinationpath.items():
if filename.lower().endswith(ext):
shutil.move(event.src_path, os.path.join(dst, filename))
print('move:', filename, '->', dst)
if __name__ == "__main__":
try:
event_handler = FileSystemEventHandler()
event_handler.on_modified = move_it
event_handler.on_created = move_it
#event_handler.on_moved = move_it # ie. rename (but this need to check `dest_path`)
observer = Observer()
observer.start()
observer.schedule(event_handler, sourcepath, recursive=True)
observer.join()
except KeyboardInterrupt:
print('Stopped by Ctrl+C')
我是 Python 的新手,我在使用 Watchdog 执行某些代码时遇到了问题。该代码应该在文件被修改或创建时将文件复制到它们各自的文件夹中。它将对一个文件起作用,但如果有更多文件匹配,它就会退出。出于某种原因,我也无法使用 Ctrl-C 停止程序。完整代码如下:
import os
import os.path
import shutil
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from watchdog.events import PatternMatchingEventHandler
sourcepath='C:/Users/bhart/Downloads/'
sourcefiles = os.listdir(sourcepath)
destinationpath = 'C:/Users/bhart/Downloads/xls'
destinationpathcsv = 'C:/Users/bhart/Downloads/csv'
destinationpathtxt = 'C:/Users/bhart/Downloads/txt'
destinationpathpdf = 'C:/Users/bhart/Downloads/pdf'
path = sourcepath
event_handler = FileSystemEventHandler()
def on_created(event):
for file in sourcefiles:
if os.path.exists(file):
if file.endswith('.xls') or file.endswith('.xlsx'):
shutil.move(os.path.join(sourcepath,file), os.path.join(destinationpath,file))
if file.endswith('.csv'):
shutil.move(os.path.join(sourcepath,file), os.path.join(destinationpathcsv,file))
print("CSV file moved.")
if file.endswith('.txt'):
print("TXT file moved")
shutil.move(os.path.join(sourcepath,file), os.path.join(destinationpathtxt,file))
if file.endswith('.pdf'):
shutil.move(os.path.join(sourcepath,file), os.path.join(destinationpathpdf,file))
def on_modified(event):
for file in sourcefiles:
if os.path.exists(file):
if file.endswith('.xls') or file.endswith('.xlsx'):
shutil.move(os.path.join(sourcepath,file), os.path.join(destinationpath,file))
if file.endswith('.csv'):
shutil.move(os.path.join(sourcepath,file), os.path.join(destinationpathcsv,file))
if file.endswith('.txt'):
print("TXT file moved")
shutil.move(os.path.join(sourcepath,file), os.path.join(destinationpathtxt,file))
if file.endswith('.pdf'):
shutil.move(os.path.join(sourcepath,file), os.path.join(destinationpathpdf,file))
if __name__ == "__main__":
event_handler.on_modified = on_modified
observer = Observer()
observer.start()
observer.schedule(event_handler, path, recursive=True)
observer.join()
event_handler.on_created = on_created
observer = Observer()
observer.start()
observer.schedule(event_handler, path, recursive=True)
observer.join()
try:
print("test")
except KeyboardInterrupt:
exit()
我不知道我是否解决了所有问题但是:
listdir()
给出没有目录的文件名,你必须使用 os.path.join()
即使你检查 os.path.exists()
if os.path.exists( os.path.join(sourcepath, file) ):
listdir()
只给出一次文件名,你必须在 for
-loop 中使用它来获取新文件名。
def on_created(event):
sourcefilenames = os.listdir(sourcepath)
for filename in sourcefilenames:
src = os.path.join(sourcepath, filename)
if os.path.exists(src):
# ... code ...
def on_modified(event):
sourcefilenames = os.listdir(sourcepath)
for filename in sourcefilenames:
src = os.path.join(sourcepath, filename)
if os.path.exists(src):
# ... code ...
.join()
阻塞代码并等到你关闭程序,所以它创建第一个 Observer
并等待它结束,然后再创建第二个 Observer
- 但你可以用一个 Observer
您在 on_created
和 on_modified
中的代码似乎相同,因此您可以在两种情况下使用一个函数
def move_it(event):
sourcefilenames = os.listdir(sourcepath)
for filename in sourcefilenames:
src = os.path.join(sourcepath, filename)
if os.path.exists(src):
# ... code ...
if __name__ == "__main__":
event_handler = FileSystemEventHandler()
event_handler.on_modified = move_it
event_handler.on_created = move_it
observer = Observer()
observer.start()
observer.schedule(event_handler, sourcepath, recursive=True)
observer.join()
如果你想捕获 Ctrl+C
那么你应该将所有代码放在 try/except
中(或者至少将 join()
放在 try/except
中)。
我不知道你对 Ctrl+C
有什么问题,但它适用于 Linux。
if __name__ == "__main__":
try:
event_handler = FileSystemEventHandler()
event_handler.on_modified = move_it
event_handler.on_created = move_it
observer = Observer()
observer.start()
observer.schedule(event_handler, sourcepath, recursive=True)
observer.join()
except KeyboardInterrupt:
print('Stopped by Ctrl+C')
一条建议:
如果使用字典,代码会更简单、更通用
{
".xls": "C:/.../xls",
".xlsx": "C:/.../xls",
# ...
}
这样您就可以使用 for
-loop 来检查所有扩展。而且您始终可以在不更改函数代码的情况下向字典添加新的扩展。
import os
import shutil
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
sourcepath = 'C:/Users/bhart/Downloads/'
destinationpath = {
'.xls' : 'C:/Users/bhart/Downloads/xls',
'.xlsx': 'C:/Users/bhart/Downloads/xls',
'.csv' : 'C:/Users/bhart/Downloads/csv',
'.txt' : 'C:/Users/bhart/Downloads/txt',
'.pdf' : 'C:/Users/bhart/Downloads/pdf',
}
def move_it(event):
sourcefilenames = os.listdir(sourcepath)
for filename in sourcefilenames:
src = os.path.join(sourcepath, filename)
if os.path.exists(src):
for ext, dst in destinationpath.items():
if filename.lower().endswith(ext):
print('move:', filename, '->', dst)
shutil.move(src, os.path.join(dst, filename))
if __name__ == "__main__":
try:
event_handler = FileSystemEventHandler()
event_handler.on_modified = move_it
event_handler.on_created = move_it
observer = Observer()
observer.start()
observer.schedule(event_handler, sourcepath, recursive=True)
observer.join()
except KeyboardInterrupt:
print('Stopped by Ctrl+C')
编辑:
event
给出 event.src_path
、event.event_type
等。您可以使用它代替 listdir()
来获取文件路径。
import os
import shutil
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
sourcepath = 'C:/Users/bhart/Downloads/'
destinationpath = {
'.xls' : 'C:/Users/bhart/Downloads/xls',
'.xlsx': 'C:/Users/bhart/Downloads/xls',
'.csv' : 'C:/Users/bhart/Downloads/csv',
'.txt' : 'C:/Users/bhart/Downloads/txt',
'.pdf' : 'C:/Users/bhart/Downloads/pdf',
}
def move_it(event):
#print(dir(event))
#print('event:', event)
#print('event_type:', event.event_type)
#print('is_directory:', event.is_directory)
#print('src_path:', event.src_path)
#print('key:', event.key)
#print('----')
if not event.is_directory:
parts = os.path.split(event.src_path)
#print('parts:', parts)
filename = parts[-1]
for ext, dst in destinationpath.items():
if filename.lower().endswith(ext):
shutil.move(event.src_path, os.path.join(dst, filename))
print('move:', filename, '->', dst)
if __name__ == "__main__":
try:
event_handler = FileSystemEventHandler()
event_handler.on_modified = move_it
event_handler.on_created = move_it
#event_handler.on_moved = move_it # ie. rename (but this need to check `dest_path`)
observer = Observer()
observer.start()
observer.schedule(event_handler, sourcepath, recursive=True)
observer.join()
except KeyboardInterrupt:
print('Stopped by Ctrl+C')