看门狗作为后台线程 - Python
Watchdog as background thread - Python
我在文件 main.py 中设置了这个看门狗设置,它应该 运行 观察者作为守护进程(在后台),在 运行 正在运行时做一些事情(这Watcher 应该做出反应,例如创建一个测试文件)然后终止 Watcher。
from watchdog.events import PatternMatchingEventHandler
from watchdog.observers import Observer
import time
import threading
class Watcher:
def watch_dir(self):
patterns = "*"
event_handler = PatternMatchingEventHandler(patterns)
event_handler.on_any_event = self.on_any_event
observer = Observer()
observer.schedule(event_handler, 'someDirToObserve')
observer.start()
try:
while True:
time.sleep(2)
except Exception as e:
observer.stop()
time.sleep(30)
observer.join()
def on_any_event(self, event):
print(event)
def start_watcher(watcher):
watcher.watch_dir()
def main():
print("Start")
watcher = Watcher()
thread = threading.Thread(target=start_watcher(watcher), daemon=True)
thread.start()
test_file = 'SomeTestFile'
test_file.unlink()
with test_file.open('w') as f:
f.write('Test')
print("Oh shit")
thread.join()
if __name__ == '__main__':
main()
如何在执行操作时在后台 运行 观察者以及如何正确终止它?
这个解决方案对我有用。主线程上的 Flask 应用程序和另一个线程上的观察者。
flask application with watchdog observer
我在文件 main.py 中设置了这个看门狗设置,它应该 运行 观察者作为守护进程(在后台),在 运行 正在运行时做一些事情(这Watcher 应该做出反应,例如创建一个测试文件)然后终止 Watcher。
from watchdog.events import PatternMatchingEventHandler
from watchdog.observers import Observer
import time
import threading
class Watcher:
def watch_dir(self):
patterns = "*"
event_handler = PatternMatchingEventHandler(patterns)
event_handler.on_any_event = self.on_any_event
observer = Observer()
observer.schedule(event_handler, 'someDirToObserve')
observer.start()
try:
while True:
time.sleep(2)
except Exception as e:
observer.stop()
time.sleep(30)
observer.join()
def on_any_event(self, event):
print(event)
def start_watcher(watcher):
watcher.watch_dir()
def main():
print("Start")
watcher = Watcher()
thread = threading.Thread(target=start_watcher(watcher), daemon=True)
thread.start()
test_file = 'SomeTestFile'
test_file.unlink()
with test_file.open('w') as f:
f.write('Test')
print("Oh shit")
thread.join()
if __name__ == '__main__':
main()
如何在执行操作时在后台 运行 观察者以及如何正确终止它?
这个解决方案对我有用。主线程上的 Flask 应用程序和另一个线程上的观察者。
flask application with watchdog observer