Python 2 个不同目录的看门狗并执行 2 个不同的处理程序
Python watchdog for 2 different directory and to execute 2 different handler
我想查看 2 个不同的目录以接收 2 个不同名称的 PDF 文件,并且在接收任何 PDF 文件时,我想在 Event_Handler 中执行一些命令提示符脚本。我可以为我的问题创建 2 个单独的文件,但我想使用单个 python 代码继续观看这些文件。您能否指导我如何以及在何处放置 If-Else 条件或其他方法来在我的下面的代码中处理此问题。
有了 2 个单独的代码文件,我可以将其存档。
我可以理解此代码仅适用于一个 class,因为 **while 循环 - time.sleep(5)。但无法想到替代方法
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import subprocess
class Watcher1:
DIRECTORY_TO_WATCH1 = "C:\Users\BPM_admin\Desktop\FinalOCR\Diageo\SampleInvoices"
def __init__(self):
self.observer = Observer()
def run1(self):
event_handler1 = Handler1()
self.observer.schedule(event_handler1, self.DIRECTORY_TO_WATCH1, recursive=True)
self.observer.start()
try:
while True:
time.sleep(5)
except:
self.observer.stop()
print("Error")
self.observer.join()
class Handler1(FileSystemEventHandler):
@staticmethod
def on_any_event(event):
if event.is_directory:
return None
elif event.event_type == 'created':
subprocess.call([
'C:\Users\BPM_admin\AppData\Local\UiPath\app-19.7.0\UiRobot.exe',
'/file',
'C:\Users\BPM_admin\Desktop\FinalOCR\InvoiceAutomation\PDFReadwithNative.xaml'
])
class Watcher2:
DIRECTORY_TO_WATCH2 = "C:\Users\BPM_admin\Desktop\SecondOCR\Diageo\SampleInvoices"
def __init__(self):
self.observer = Observer()
def run2(self):
event_handler2 = Handler2()
self.observer.schedule(event_handler2, self.DIRECTORY_TO_WATCH2, recursive=True)
self.observer.start()
try:
while True:
time.sleep(5)
except:
self.observer.stop()
print("Error")
self.observer.join()
class Handler2(FileSystemEventHandler):
@staticmethod
def on_any_event(event):
if event.is_directory:
return None
elif event.event_type == 'created':
subprocess.call([
'C:\Users\BPM_admin\AppData\Local\UiPath\app-19.7.0\UiRobot.exe',
'/file',
'C:\Users\BPM_admin\Desktop\SecondOCR\InvoiceAutomation\PDFReadwithNative.xaml'
像这样我调用了 Main 方法:
if __name__ == '__main__':
w1 = Watcher1()
w1.run1()
w2 = Watcher2()
w2.run2()
我会将 while True
从 class 移动到 __main__
if __name__ == '__main__':
w1 = Watcher1()
w2 = Watcher2()
w1.start()
w2.start()
try:
while True:
time.sleep(5)
except:
w1.stop()
w2.stop()
print("Error")
w1.join()
w2.join()
而且event_handler1
/event_handler2
和DIRECTORY_TO_WATCH1
/不需要在分隔的class中使用数字1
2
DIRECTORY_TO_WATCH2
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import subprocess
class Handler1(FileSystemEventHandler):
@staticmethod
def on_any_event(event):
if event.is_directory:
return None
elif event.event_type == 'created':
subprocess.call([
'C:\Users\BPM_admin\AppData\Local\UiPath\app-19.7.0\UiRobot.exe',
'/file',
'C:\Users\BPM_admin\Desktop\FinalOCR\InvoiceAutomation\PDFReadwithNative.xaml'
])
class Handler2(FileSystemEventHandler):
@staticmethod
def on_any_event(event):
if event.is_directory:
return None
elif event.event_type == 'created':
subprocess.call([
'C:\Users\BPM_admin\AppData\Local\UiPath\app-19.7.0\UiRobot.exe',
'/file',
'C:\Users\BPM_admin\Desktop\SecondOCR\InvoiceAutomation\PDFReadwithNative.xaml'
])
class Watcher1:
DIRECTORY_TO_WATCH = "C:\Users\BPM_admin\Desktop\FinalOCR\Diageo\SampleInvoices"
def __init__(self):
self.observer = Observer()
def start(self):
event_handler = Handler1()
self.observer.schedule(event_handler, self.DIRECTORY_TO_WATCH, recursive=True)
self.observer.start()
def stop(self)
self.observer.stop()
def join(self)
self.observer.join()
class Watcher2:
DIRECTORY_TO_WATCH = "C:\Users\BPM_admin\Desktop\SecondOCR\Diageo\SampleInvoices"
def __init__(self):
self.observer = Observer()
def start(self):
event_handler = Handler2()
self.observer.schedule(event_handler, self.DIRECTORY_TO_WATCH, recursive=True)
self.observer.start()
def stop(self)
self.observer.stop()
def join(self)
self.observer.join()
if __name__ == '__main__':
w1 = Watcher1()
w2 = Watcher2()
w1.start()
w2.start()
try:
while True:
time.sleep(5)
except:
w1.stop()
w2.stop()
print("Error")
w1.join()
w2.join()
因为 classes 非常相似所以它可能是一个 class 具有不同的参数
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import subprocess
class Handler1(FileSystemEventHandler):
@staticmethod
def on_any_event(event):
if event.is_directory:
return None
elif event.event_type == 'created':
subprocess.call([
'C:\Users\BPM_admin\AppData\Local\UiPath\app-19.7.0\UiRobot.exe',
'/file',
'C:\Users\BPM_admin\Desktop\FinalOCR\InvoiceAutomation\PDFReadwithNative.xaml'
])
class Handler2(FileSystemEventHandler):
@staticmethod
def on_any_event(event):
if event.is_directory:
return None
elif event.event_type == 'created':
subprocess.call([
'C:\Users\BPM_admin\AppData\Local\UiPath\app-19.7.0\UiRobot.exe',
'/file',
'C:\Users\BPM_admin\Desktop\SecondOCR\InvoiceAutomation\PDFReadwithNative.xaml'
])
class Watcher:
def __init__(self, directory, handler):
self.directory = directory
self.handler = handler
self.observer = Observer()
def start(self):
self.observer.schedule(self.handler, self.directory, recursive=True)
self.observer.start()
def stop(self)
self.observer.stop()
def join(self)
self.observer.join()
if __name__ == '__main__':
w1 = Watcher("C:\Users\BPM_admin\Desktop\FinalOCR\Diageo\SampleInvoices", Handler1())
w2 = Watcher("C:\Users\BPM_admin\Desktop\SecondOCR\Diageo\SampleInvoices", Handler2())
w1.start()
w2.start()
try:
while True:
time.sleep(5)
except:
w1.stop()
w2.stop()
print("Error")
w1.join()
w2.join()
处理程序也非常相似,因此您可以将它们简化为一个具有不同参数的处理程序
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import subprocess
class Handler(FileSystemEventHandler):
def __init__(self, path):
super().__init__()
self.path = path
def on_any_event(event):
if event.is_directory:
return None
elif event.event_type == 'created':
subprocess.call([
'C:\Users\BPM_admin\AppData\Local\UiPath\app-19.7.0\UiRobot.exe',
'/file',
self.path
])
class Watcher:
def __init__(self, directory, handler):
self.directory = directory
self.handler = handler
self.observer = Observer()
def start(self):
self.observer.schedule(self.handler, self.directory, recursive=True)
self.observer.start()
def stop(self)
self.observer.stop()
def join(self)
self.observer.join()
if __name__ == '__main__':
handler1 = Handler('C:\Users\BPM_admin\Desktop\FinalOCR\InvoiceAutomation\PDFReadwithNative.xaml')
handler2 = Handler('C:\Users\BPM_admin\Desktop\SecondOCR\InvoiceAutomation\PDFReadwithNative.xaml')
w1 = Watcher("C:\Users\BPM_admin\Desktop\FinalOCR\Diageo\SampleInvoices", handler1)
w2 = Watcher("C:\Users\BPM_admin\Desktop\SecondOCR\Diageo\SampleInvoices", handler2)
w1.start()
w2.start()
try:
while True:
time.sleep(5)
except:
w1.stop()
w2.stop()
print("Error")
w1.join()
w2.join()
我想查看 2 个不同的目录以接收 2 个不同名称的 PDF 文件,并且在接收任何 PDF 文件时,我想在 Event_Handler 中执行一些命令提示符脚本。我可以为我的问题创建 2 个单独的文件,但我想使用单个 python 代码继续观看这些文件。您能否指导我如何以及在何处放置 If-Else 条件或其他方法来在我的下面的代码中处理此问题。
有了 2 个单独的代码文件,我可以将其存档。 我可以理解此代码仅适用于一个 class,因为 **while 循环 - time.sleep(5)。但无法想到替代方法
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import subprocess
class Watcher1:
DIRECTORY_TO_WATCH1 = "C:\Users\BPM_admin\Desktop\FinalOCR\Diageo\SampleInvoices"
def __init__(self):
self.observer = Observer()
def run1(self):
event_handler1 = Handler1()
self.observer.schedule(event_handler1, self.DIRECTORY_TO_WATCH1, recursive=True)
self.observer.start()
try:
while True:
time.sleep(5)
except:
self.observer.stop()
print("Error")
self.observer.join()
class Handler1(FileSystemEventHandler):
@staticmethod
def on_any_event(event):
if event.is_directory:
return None
elif event.event_type == 'created':
subprocess.call([
'C:\Users\BPM_admin\AppData\Local\UiPath\app-19.7.0\UiRobot.exe',
'/file',
'C:\Users\BPM_admin\Desktop\FinalOCR\InvoiceAutomation\PDFReadwithNative.xaml'
])
class Watcher2:
DIRECTORY_TO_WATCH2 = "C:\Users\BPM_admin\Desktop\SecondOCR\Diageo\SampleInvoices"
def __init__(self):
self.observer = Observer()
def run2(self):
event_handler2 = Handler2()
self.observer.schedule(event_handler2, self.DIRECTORY_TO_WATCH2, recursive=True)
self.observer.start()
try:
while True:
time.sleep(5)
except:
self.observer.stop()
print("Error")
self.observer.join()
class Handler2(FileSystemEventHandler):
@staticmethod
def on_any_event(event):
if event.is_directory:
return None
elif event.event_type == 'created':
subprocess.call([
'C:\Users\BPM_admin\AppData\Local\UiPath\app-19.7.0\UiRobot.exe',
'/file',
'C:\Users\BPM_admin\Desktop\SecondOCR\InvoiceAutomation\PDFReadwithNative.xaml'
像这样我调用了 Main 方法:
if __name__ == '__main__':
w1 = Watcher1()
w1.run1()
w2 = Watcher2()
w2.run2()
我会将 while True
从 class 移动到 __main__
if __name__ == '__main__':
w1 = Watcher1()
w2 = Watcher2()
w1.start()
w2.start()
try:
while True:
time.sleep(5)
except:
w1.stop()
w2.stop()
print("Error")
w1.join()
w2.join()
而且event_handler1
/event_handler2
和DIRECTORY_TO_WATCH1
/不需要在分隔的class中使用数字1
2
DIRECTORY_TO_WATCH2
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import subprocess
class Handler1(FileSystemEventHandler):
@staticmethod
def on_any_event(event):
if event.is_directory:
return None
elif event.event_type == 'created':
subprocess.call([
'C:\Users\BPM_admin\AppData\Local\UiPath\app-19.7.0\UiRobot.exe',
'/file',
'C:\Users\BPM_admin\Desktop\FinalOCR\InvoiceAutomation\PDFReadwithNative.xaml'
])
class Handler2(FileSystemEventHandler):
@staticmethod
def on_any_event(event):
if event.is_directory:
return None
elif event.event_type == 'created':
subprocess.call([
'C:\Users\BPM_admin\AppData\Local\UiPath\app-19.7.0\UiRobot.exe',
'/file',
'C:\Users\BPM_admin\Desktop\SecondOCR\InvoiceAutomation\PDFReadwithNative.xaml'
])
class Watcher1:
DIRECTORY_TO_WATCH = "C:\Users\BPM_admin\Desktop\FinalOCR\Diageo\SampleInvoices"
def __init__(self):
self.observer = Observer()
def start(self):
event_handler = Handler1()
self.observer.schedule(event_handler, self.DIRECTORY_TO_WATCH, recursive=True)
self.observer.start()
def stop(self)
self.observer.stop()
def join(self)
self.observer.join()
class Watcher2:
DIRECTORY_TO_WATCH = "C:\Users\BPM_admin\Desktop\SecondOCR\Diageo\SampleInvoices"
def __init__(self):
self.observer = Observer()
def start(self):
event_handler = Handler2()
self.observer.schedule(event_handler, self.DIRECTORY_TO_WATCH, recursive=True)
self.observer.start()
def stop(self)
self.observer.stop()
def join(self)
self.observer.join()
if __name__ == '__main__':
w1 = Watcher1()
w2 = Watcher2()
w1.start()
w2.start()
try:
while True:
time.sleep(5)
except:
w1.stop()
w2.stop()
print("Error")
w1.join()
w2.join()
因为 classes 非常相似所以它可能是一个 class 具有不同的参数
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import subprocess
class Handler1(FileSystemEventHandler):
@staticmethod
def on_any_event(event):
if event.is_directory:
return None
elif event.event_type == 'created':
subprocess.call([
'C:\Users\BPM_admin\AppData\Local\UiPath\app-19.7.0\UiRobot.exe',
'/file',
'C:\Users\BPM_admin\Desktop\FinalOCR\InvoiceAutomation\PDFReadwithNative.xaml'
])
class Handler2(FileSystemEventHandler):
@staticmethod
def on_any_event(event):
if event.is_directory:
return None
elif event.event_type == 'created':
subprocess.call([
'C:\Users\BPM_admin\AppData\Local\UiPath\app-19.7.0\UiRobot.exe',
'/file',
'C:\Users\BPM_admin\Desktop\SecondOCR\InvoiceAutomation\PDFReadwithNative.xaml'
])
class Watcher:
def __init__(self, directory, handler):
self.directory = directory
self.handler = handler
self.observer = Observer()
def start(self):
self.observer.schedule(self.handler, self.directory, recursive=True)
self.observer.start()
def stop(self)
self.observer.stop()
def join(self)
self.observer.join()
if __name__ == '__main__':
w1 = Watcher("C:\Users\BPM_admin\Desktop\FinalOCR\Diageo\SampleInvoices", Handler1())
w2 = Watcher("C:\Users\BPM_admin\Desktop\SecondOCR\Diageo\SampleInvoices", Handler2())
w1.start()
w2.start()
try:
while True:
time.sleep(5)
except:
w1.stop()
w2.stop()
print("Error")
w1.join()
w2.join()
处理程序也非常相似,因此您可以将它们简化为一个具有不同参数的处理程序
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import subprocess
class Handler(FileSystemEventHandler):
def __init__(self, path):
super().__init__()
self.path = path
def on_any_event(event):
if event.is_directory:
return None
elif event.event_type == 'created':
subprocess.call([
'C:\Users\BPM_admin\AppData\Local\UiPath\app-19.7.0\UiRobot.exe',
'/file',
self.path
])
class Watcher:
def __init__(self, directory, handler):
self.directory = directory
self.handler = handler
self.observer = Observer()
def start(self):
self.observer.schedule(self.handler, self.directory, recursive=True)
self.observer.start()
def stop(self)
self.observer.stop()
def join(self)
self.observer.join()
if __name__ == '__main__':
handler1 = Handler('C:\Users\BPM_admin\Desktop\FinalOCR\InvoiceAutomation\PDFReadwithNative.xaml')
handler2 = Handler('C:\Users\BPM_admin\Desktop\SecondOCR\InvoiceAutomation\PDFReadwithNative.xaml')
w1 = Watcher("C:\Users\BPM_admin\Desktop\FinalOCR\Diageo\SampleInvoices", handler1)
w2 = Watcher("C:\Users\BPM_admin\Desktop\SecondOCR\Diageo\SampleInvoices", handler2)
w1.start()
w2.start()
try:
while True:
time.sleep(5)
except:
w1.stop()
w2.stop()
print("Error")
w1.join()
w2.join()