在 python 看门狗中并行处理多个 onCreated 事件
Process Multiple onCreated events parallelly in python watchdog
我正在尝试检测目录中是否创建了任何新文件;如果创建了我想处理它(需要 10 分钟输出),同时其他新文件也会在文件夹中创建。
我如何使用多进程注册看门狗的 oncreated,这样它不会等待一个文件完成,而是在每次创建文件时生成一个新进程。
import time
import datetime
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler
import multiprocessing as mp
def on_created(event):
print(f"hey, {event.src_path} has been created!")
time.sleep(10)
doProcessing(event.src_path)
print(f"hey for {event.src_path}")
if __name__ == "__main__":
patterns = "*"
ignore_patterns = ""
ignore_directories = False
case_sensitive = True
my_event_handler = PatternMatchingEventHandler(patterns, ignore_patterns, ignore_directories, case_sensitive)
path = "D:\watcher"
go_recursively = True
my_observer = Observer()
my_observer.schedule(my_event_handler, path, recursive=go_recursively)
my_observer.start()
my_event_handler.on_created = on_created
#my_event_handler.on_deleted = on_deleted
#my_event_handler.on_modified = on_modified
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
my_observer.stop()
my_observer.join()
def doProcessing(filename):
print("Processing")
抱歉代码中有这么多注释部分;本质上 pool.apply_async(print_func, (event,))
是帮助解决问题的方法;一旦事件被推入队列; process_on_load 函数遍历队列并异步运行 print_func.
# -*- coding: utf-8 -*-
"""
Created on Mon Oct 21 22:02:55 2019
@author: 1009758
"""
import os
import time
import datetime
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler
import multiprocessing as mp
from multiprocessing import Process
from multiprocessing import Queue
import threading
from multiprocessing import Pool
PROCESSES = mp.cpu_count() - 1
NUMBER_OF_TASKS = 10
class FileLoaderWatchdog(PatternMatchingEventHandler):
''' Watches a nominated directory and when a * type file is
moved
'''
def __init__(self, queue, patterns):
PatternMatchingEventHandler.__init__(self, patterns=patterns)
self.queue = queue
def process(self, event):
'''
event.event_type
'modified' | 'created' | 'moved' | 'deleted'
event.is_directory
True | False
event.src_path
path/to/observed/file
'''
self.queue.put(event)
def on_created(self, event):
self.process(event)
now = datetime.datetime.utcnow()
#print(f"hey for {event.src_path}")
print ("{0} -- event {1} off the queue ...".format(now.strftime("%Y/%m/%d %H:%M:%S"), event.src_path))
def print_func(event):
time.sleep(5)
now = datetime.datetime.utcnow()
print ("{0} -- Pulling {1} off the queue ...".format(now.strftime("%Y/%m/%d %H:%M:%S"), event.src_path))
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
def process_load_queue(q):
'''This is the worker thread function. It is run as a daemon
threads that only exit when the main thread ends.
Args
==========
q: Queue() object
'''
while True:
if not q.empty():
#mp.set_start_method('spawn')
event = q.get()
pool = Pool(processes=1)
pool.apply_async(print_func, (event,))
##p = Pool(5)
#p.map(print_func,(event,))
#print_func(event)
#info('main line')
#procs = []
#proc = Process(target=print_func, args=(event,))
#procs.append(proc)
#proc.start()
#for proc in procs:
# proc.join()
#print ("{0} -- Pulling {1} off the queue ...".format(now.strftime("%Y/%m/%d %H:%M:%S"), event.src_path))
#time.sleep(5)
# now2 = datetime.datetime.utcnow()
#print ("{0} -- Replying {1} off the queue ...".format(now2.strftime("%Y/%m/%d %H:%M:%S"), event.src_path))
else:
time.sleep(1)
if __name__ == '__main__':
# create queue
watchdog_queue = Queue()
# Set up a worker thread to process database load
# setup watchdog to monitor directory for trigger files
#args = sys.argv[1:]
patt = ["*"]
path_watch = "D:\watcher"
event_handler = FileLoaderWatchdog(watchdog_queue, patterns=patt)
observer = Observer()
observer.schedule(event_handler, path=path_watch)
observer.start()
#pool=Pool(processes = 1)
#pool.apply_async(process_load_queue, (watchdog_queue,))
worker = threading.Thread(target=process_load_queue, args=(watchdog_queue,))
worker.setDaemon(True)
worker.start()
#p = Pool(2)
#p.map(observer,watchdog_queue)
#asyncio.run(main())
try:
while True:
time.sleep(2)
except KeyboardInterrupt:
observer.stop()
observer.join()
我正在尝试检测目录中是否创建了任何新文件;如果创建了我想处理它(需要 10 分钟输出),同时其他新文件也会在文件夹中创建。
我如何使用多进程注册看门狗的 oncreated,这样它不会等待一个文件完成,而是在每次创建文件时生成一个新进程。
import time
import datetime
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler
import multiprocessing as mp
def on_created(event):
print(f"hey, {event.src_path} has been created!")
time.sleep(10)
doProcessing(event.src_path)
print(f"hey for {event.src_path}")
if __name__ == "__main__":
patterns = "*"
ignore_patterns = ""
ignore_directories = False
case_sensitive = True
my_event_handler = PatternMatchingEventHandler(patterns, ignore_patterns, ignore_directories, case_sensitive)
path = "D:\watcher"
go_recursively = True
my_observer = Observer()
my_observer.schedule(my_event_handler, path, recursive=go_recursively)
my_observer.start()
my_event_handler.on_created = on_created
#my_event_handler.on_deleted = on_deleted
#my_event_handler.on_modified = on_modified
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
my_observer.stop()
my_observer.join()
def doProcessing(filename):
print("Processing")
抱歉代码中有这么多注释部分;本质上 pool.apply_async(print_func, (event,))
是帮助解决问题的方法;一旦事件被推入队列; process_on_load 函数遍历队列并异步运行 print_func.
# -*- coding: utf-8 -*-
"""
Created on Mon Oct 21 22:02:55 2019
@author: 1009758
"""
import os
import time
import datetime
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler
import multiprocessing as mp
from multiprocessing import Process
from multiprocessing import Queue
import threading
from multiprocessing import Pool
PROCESSES = mp.cpu_count() - 1
NUMBER_OF_TASKS = 10
class FileLoaderWatchdog(PatternMatchingEventHandler):
''' Watches a nominated directory and when a * type file is
moved
'''
def __init__(self, queue, patterns):
PatternMatchingEventHandler.__init__(self, patterns=patterns)
self.queue = queue
def process(self, event):
'''
event.event_type
'modified' | 'created' | 'moved' | 'deleted'
event.is_directory
True | False
event.src_path
path/to/observed/file
'''
self.queue.put(event)
def on_created(self, event):
self.process(event)
now = datetime.datetime.utcnow()
#print(f"hey for {event.src_path}")
print ("{0} -- event {1} off the queue ...".format(now.strftime("%Y/%m/%d %H:%M:%S"), event.src_path))
def print_func(event):
time.sleep(5)
now = datetime.datetime.utcnow()
print ("{0} -- Pulling {1} off the queue ...".format(now.strftime("%Y/%m/%d %H:%M:%S"), event.src_path))
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
def process_load_queue(q):
'''This is the worker thread function. It is run as a daemon
threads that only exit when the main thread ends.
Args
==========
q: Queue() object
'''
while True:
if not q.empty():
#mp.set_start_method('spawn')
event = q.get()
pool = Pool(processes=1)
pool.apply_async(print_func, (event,))
##p = Pool(5)
#p.map(print_func,(event,))
#print_func(event)
#info('main line')
#procs = []
#proc = Process(target=print_func, args=(event,))
#procs.append(proc)
#proc.start()
#for proc in procs:
# proc.join()
#print ("{0} -- Pulling {1} off the queue ...".format(now.strftime("%Y/%m/%d %H:%M:%S"), event.src_path))
#time.sleep(5)
# now2 = datetime.datetime.utcnow()
#print ("{0} -- Replying {1} off the queue ...".format(now2.strftime("%Y/%m/%d %H:%M:%S"), event.src_path))
else:
time.sleep(1)
if __name__ == '__main__':
# create queue
watchdog_queue = Queue()
# Set up a worker thread to process database load
# setup watchdog to monitor directory for trigger files
#args = sys.argv[1:]
patt = ["*"]
path_watch = "D:\watcher"
event_handler = FileLoaderWatchdog(watchdog_queue, patterns=patt)
observer = Observer()
observer.schedule(event_handler, path=path_watch)
observer.start()
#pool=Pool(processes = 1)
#pool.apply_async(process_load_queue, (watchdog_queue,))
worker = threading.Thread(target=process_load_queue, args=(watchdog_queue,))
worker.setDaemon(True)
worker.start()
#p = Pool(2)
#p.map(observer,watchdog_queue)
#asyncio.run(main())
try:
while True:
time.sleep(2)
except KeyboardInterrupt:
observer.stop()
observer.join()