在 python 看门狗中并行处理多个 onCreated 事件

Process Multiple onCreated events parallelly in python watchdog

我正在尝试检测目录中是否创建了任何新文件;如果创建了我想处理它(需要 10 分钟输出),同时其他新文件也会在文件夹中创建。

我如何使用多进程注册看门狗的 oncreated,这样它不会等待一个文件完成,而是在每次创建文件时生成一个新进程。

import time
import datetime
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler
import multiprocessing as mp
def on_created(event):
    print(f"hey, {event.src_path} has been created!")
    time.sleep(10)
    doProcessing(event.src_path)
    print(f"hey for {event.src_path}")

if __name__ == "__main__":
    patterns = "*"
    ignore_patterns = ""
    ignore_directories = False
    case_sensitive = True
    my_event_handler = PatternMatchingEventHandler(patterns, ignore_patterns, ignore_directories, case_sensitive)
    path = "D:\watcher"
    go_recursively = True
    my_observer = Observer()
    my_observer.schedule(my_event_handler, path, recursive=go_recursively)
    my_observer.start()    
    my_event_handler.on_created = on_created
    #my_event_handler.on_deleted = on_deleted
    #my_event_handler.on_modified = on_modified
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        my_observer.stop()
    my_observer.join()

def doProcessing(filename):
    print("Processing")

抱歉代码中有这么多注释部分;本质上 pool.apply_async(print_func, (event,)) 是帮助解决问题的方法;一旦事件被推入队列; process_on_load 函数遍历队列并异步运行 print_func.

# -*- coding: utf-8 -*-
"""
Created on Mon Oct 21 22:02:55 2019

@author: 1009758
"""
import os
import time
import datetime
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler
import multiprocessing as mp
from multiprocessing import Process
from multiprocessing import Queue
import threading

from multiprocessing import Pool
PROCESSES = mp.cpu_count() - 1
NUMBER_OF_TASKS = 10
class FileLoaderWatchdog(PatternMatchingEventHandler):
    ''' Watches a nominated directory and when a * type  file is 
        moved 

    '''

    def __init__(self, queue, patterns):
        PatternMatchingEventHandler.__init__(self, patterns=patterns)
        self.queue = queue

    def process(self, event):
        '''
        event.event_type
            'modified' | 'created' | 'moved' | 'deleted'
        event.is_directory
            True | False
        event.src_path
            path/to/observed/file
        '''
        self.queue.put(event)

    def on_created(self, event):
        self.process(event)
        now = datetime.datetime.utcnow()
        #print(f"hey for {event.src_path}")
        print ("{0} -- event {1} off the queue ...".format(now.strftime("%Y/%m/%d %H:%M:%S"), event.src_path))


def print_func(event):
    time.sleep(5)
    now = datetime.datetime.utcnow()
    print ("{0} -- Pulling {1} off the queue ...".format(now.strftime("%Y/%m/%d %H:%M:%S"), event.src_path))

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())   

def process_load_queue(q):
    '''This is the worker thread function. It is run as a daemon 
       threads that only exit when the main thread ends.

       Args
       ==========
         q:  Queue() object
    '''
    while True:
        if not q.empty():
            #mp.set_start_method('spawn')
            event = q.get()
            pool = Pool(processes=1)
            pool.apply_async(print_func, (event,))
            ##p = Pool(5)
            #p.map(print_func,(event,))
            #print_func(event)
            #info('main line')
            #procs = []
            #proc = Process(target=print_func, args=(event,))
            #procs.append(proc)
            #proc.start()
            #for proc in procs:
             #   proc.join()
            #print ("{0} -- Pulling {1} off the queue ...".format(now.strftime("%Y/%m/%d %H:%M:%S"), event.src_path))
            #time.sleep(5)
           # now2 = datetime.datetime.utcnow()
            #print ("{0} -- Replying {1} off the queue ...".format(now2.strftime("%Y/%m/%d %H:%M:%S"), event.src_path))
        else:
            time.sleep(1)



if __name__ == '__main__':

    # create queue
    watchdog_queue = Queue()


    # Set up a worker thread to process database load


    # setup watchdog to monitor directory for trigger files
    #args = sys.argv[1:]
    patt = ["*"]
    path_watch = "D:\watcher"
    event_handler = FileLoaderWatchdog(watchdog_queue, patterns=patt)
    observer = Observer()
    observer.schedule(event_handler, path=path_watch)
    observer.start()
    #pool=Pool(processes = 1)
    #pool.apply_async(process_load_queue, (watchdog_queue,))
    worker = threading.Thread(target=process_load_queue, args=(watchdog_queue,))

    worker.setDaemon(True)
    worker.start()
    #p = Pool(2)
    #p.map(observer,watchdog_queue)


    #asyncio.run(main())
    try:
        while True:
            time.sleep(2)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()