如果修改了文件夹,如何使用看门狗并在主线程中执行某些操作?
How to use watchdog and do something in the main thread if a folder is modified?
我一直在寻找看门狗,如果在文件夹中创建文件,我找到了 this great library. I need to fit a DBSCAN 模型。 Joblib 用于 scikit-learn 的 DBSCAN 实现,如果 DBSCAN 运行ning 代码不在主线程中,joblib 不允许使用多处理。如果我使用看门狗,DBSCAN 代码不能在主线程中 运行。我该如何解决这个问题?您可以在下面找到看门狗脚本和一个简单的函数来测试它。当我 运行 main_watchdog.py
并在看门狗监视的文件夹中添加一个文件时,它 运行 是 Thread-1[ 中的 simple_function.py
。与此同时,main_watchdog.py
运行s 在 MainThread.
中
PS: 一个解决方案可以在每次调用 simple_function.py
时启动一个子进程,但我担心这可能会导致如果在 watchdog 文件夹中创建了多个文件,则会出现一些问题。想象一下一次接收 10 或 100 或 10000 个文件...
#main_watchdog.py
import time
import logging
import threading
from watchdog.observers import Observer
from watchdog.events import LoggingEventHandler
from a_function import simple_function
class Event(LoggingEventHandler):
def on_created(self, event):
simple_function(x)
def on_modified(self, event):
simple_function(x)
if __name__ == "__main__":
x = 1
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
# path = sys.argv[1] if len(sys.argv) > 1 else '.'
path = '/path/to/watch/the/folder'
event_handler = Event()
observer = Observer()
observer.schedule(event_handler, path, recursive=False)
observer.start()
try:
while True:
time.sleep(1)
print(threading.current_thread().name)
except KeyboardInterrupt:
observer.stop()
observer.join()
#a_function.py
import threading
def simple_function(x):
x += 1
print(threading.current_thread().name)
print(x)
如果我要正确理解这个问题,您需要在主线程中将业务逻辑 运行 并将观察者在后台线程中 运行 。
这可以很容易地解决,方法是使用线程库在后台调用观察者线程,然后通过 Queues
将这些事件的值传递给您的函数调用,这是线程之间的一种通信方式。
#main_watchdog.py
import time
import logging
import threading
from watchdog.observers import Observer
from watchdog.events import LoggingEventHandler
from a_function import simple_function
import sys
from queue import Queue
q = Queue()
x = 1
class Event(LoggingEventHandler):
def on_created(self, event):
q.put(x)
def on_modified(self, event):
q.put(x)
def run_observer():
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
path = sys.argv[1] if len(sys.argv) > 1 else '.'
# path = '/path/to/watch/the/folder'
event_handler = Event()
observer = Observer()
observer.schedule(event_handler, path, recursive=False)
observer.start()
while True:
time.sleep(1)
print(threading.currentThread().name)
if __name__ == "__main__":
background_thread = threading.Thread(target=run_observer, args=())
background_thread.daemon = True
background_thread.start()
print('Business logic')
while True:
val = q.get(True)
simple_function(val)
其他功能保持不变
我一直在寻找看门狗,如果在文件夹中创建文件,我找到了 this great library. I need to fit a DBSCAN 模型。 Joblib 用于 scikit-learn 的 DBSCAN 实现,如果 DBSCAN 运行ning 代码不在主线程中,joblib 不允许使用多处理。如果我使用看门狗,DBSCAN 代码不能在主线程中 运行。我该如何解决这个问题?您可以在下面找到看门狗脚本和一个简单的函数来测试它。当我 运行 main_watchdog.py
并在看门狗监视的文件夹中添加一个文件时,它 运行 是 Thread-1[ 中的 simple_function.py
。与此同时,main_watchdog.py
运行s 在 MainThread.
PS: 一个解决方案可以在每次调用 simple_function.py
时启动一个子进程,但我担心这可能会导致如果在 watchdog 文件夹中创建了多个文件,则会出现一些问题。想象一下一次接收 10 或 100 或 10000 个文件...
#main_watchdog.py
import time
import logging
import threading
from watchdog.observers import Observer
from watchdog.events import LoggingEventHandler
from a_function import simple_function
class Event(LoggingEventHandler):
def on_created(self, event):
simple_function(x)
def on_modified(self, event):
simple_function(x)
if __name__ == "__main__":
x = 1
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
# path = sys.argv[1] if len(sys.argv) > 1 else '.'
path = '/path/to/watch/the/folder'
event_handler = Event()
observer = Observer()
observer.schedule(event_handler, path, recursive=False)
observer.start()
try:
while True:
time.sleep(1)
print(threading.current_thread().name)
except KeyboardInterrupt:
observer.stop()
observer.join()
#a_function.py
import threading
def simple_function(x):
x += 1
print(threading.current_thread().name)
print(x)
如果我要正确理解这个问题,您需要在主线程中将业务逻辑 运行 并将观察者在后台线程中 运行 。
这可以很容易地解决,方法是使用线程库在后台调用观察者线程,然后通过 Queues
将这些事件的值传递给您的函数调用,这是线程之间的一种通信方式。
#main_watchdog.py
import time
import logging
import threading
from watchdog.observers import Observer
from watchdog.events import LoggingEventHandler
from a_function import simple_function
import sys
from queue import Queue
q = Queue()
x = 1
class Event(LoggingEventHandler):
def on_created(self, event):
q.put(x)
def on_modified(self, event):
q.put(x)
def run_observer():
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
path = sys.argv[1] if len(sys.argv) > 1 else '.'
# path = '/path/to/watch/the/folder'
event_handler = Event()
observer = Observer()
observer.schedule(event_handler, path, recursive=False)
observer.start()
while True:
time.sleep(1)
print(threading.currentThread().name)
if __name__ == "__main__":
background_thread = threading.Thread(target=run_observer, args=())
background_thread.daemon = True
background_thread.start()
print('Business logic')
while True:
val = q.get(True)
simple_function(val)
其他功能保持不变