如果修改了文件夹,如何使用看门狗并在主线程中执行某些操作?

How to use watchdog and do something in the main thread if a folder is modified?

我一直在寻找看门狗,如果在文件夹中创建文件,我找到了 this great library. I need to fit a DBSCAN 模型。 Joblib 用于 scikit-learn 的 DBSCAN 实现,如果 DBSCAN 运行ning 代码不在主线程中,joblib 不允许使用多处理。如果我使用看门狗,DBSCAN 代码不能在主线程中 运行。我该如何解决这个问题?您可以在下面找到看门狗脚本和一个简单的函数来测试它。当我 运行 main_watchdog.py 并在看门狗监视的文件夹中添加一个文件时,它 运行 是 Thread-1[ 中的 simple_function.py。与此同时,main_watchdog.py 运行s 在 MainThread.

PS: 一个解决方案可以在每次调用 simple_function.py 时启动一个子进程,但我担心这可能会导致如果在 watchdog 文件夹中创建了多个文件,则会出现一些问题。想象一下一次接收 10 或 100 或 10000 个文件...

#main_watchdog.py
import time
import logging
import threading
from watchdog.observers import Observer
from watchdog.events import LoggingEventHandler
from a_function import simple_function

class Event(LoggingEventHandler):
    def on_created(self, event):
        simple_function(x)

    def on_modified(self, event):
        simple_function(x)

if __name__ == "__main__":
    x = 1
    logging.basicConfig(level=logging.INFO,
                        format='%(asctime)s - %(message)s',
                        datefmt='%Y-%m-%d %H:%M:%S')
    # path = sys.argv[1] if len(sys.argv) > 1 else '.'
    path = '/path/to/watch/the/folder'
    event_handler = Event()
    observer = Observer()
    observer.schedule(event_handler, path, recursive=False)
    observer.start()
    try:
        while True:
            time.sleep(1)
            print(threading.current_thread().name)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()
#a_function.py
import threading
def simple_function(x):
    x += 1
    print(threading.current_thread().name)
    print(x)

如果我要正确理解这个问题,您需要在主线程中将业务逻辑 运行 并将观察者在后台线程中 运行 。 这可以很容易地解决,方法是使用线程库在后台调用观察者线程,然后通过 Queues 将这些事件的值传递给您的函数调用,这是线程之间的一种通信方式。

#main_watchdog.py
import time
import logging
import threading
from watchdog.observers import Observer
from watchdog.events import LoggingEventHandler
from a_function import simple_function
import sys
from queue import Queue

q = Queue()
x = 1
class Event(LoggingEventHandler):
    def on_created(self, event):
        q.put(x)
    def on_modified(self, event):
        q.put(x)

def run_observer():
    logging.basicConfig(level=logging.INFO,
                        format='%(asctime)s - %(message)s',
                        datefmt='%Y-%m-%d %H:%M:%S')
    path = sys.argv[1] if len(sys.argv) > 1 else '.'
    # path = '/path/to/watch/the/folder'
    event_handler = Event()
    observer = Observer()
    observer.schedule(event_handler, path, recursive=False)
    observer.start()
    while True:
        time.sleep(1)
        print(threading.currentThread().name)

if __name__ == "__main__":
    background_thread = threading.Thread(target=run_observer, args=())
    background_thread.daemon = True
    background_thread.start()
    print('Business logic')
    while True:
        val = q.get(True)
        simple_function(val)


其他功能保持不变