Python 3.9看门狗,为每个事件启动进程

Python 3.9 Watchdog, start process for each event

我写了一些 python 类 来处理使用 pythons 看门狗模块创建的文件。 每当触发新文件事件时,都应该启动一个进程来处理文件的处理。同时有来自不同来源的传入文件,所以我希望多个进程“同时”处理一些文件。

根据系统日志,只创建了一个进程,因此没有多进程。我只看到一行

Sep 21 13:53:02 host1 python3.9[6704] : ...

在日志中具有完全相同的 PID(在本例中为 6704)。

谁能告诉我我做错了什么?

fileMonitor.py的内容:

import time
from watchdog.observers.polling import PollingObserver
from watchdog.events import RegexMatchingEventHandler

import converter

##
## \brief Class to handle monitored events
##
class LogFileEventHandler(RegexMatchingEventHandler):

    MONITOR_REGEX = [r'.*\.(gz|txt)$']  # watch out for new "*.gz" or "*.txt"-files only
    IGNORE_REGEX = [r'.*/archive/*']    # ignore events below path "*/archive/*"

    ###
    ### Public methods
    ###

    def __init__(self):
        super().__init__(
            regexes=self.MONITOR_REGEX,
            ignore_regexes=self.IGNORE_REGEX,
            ignore_directories=True,
            case_sensitive=False)
        self.cm = converter.ConverterManager()

    def on_created(self, event):
        self.cm.convertMemory(event.src_path)

    def on_moved(self, event):
        self.cm.convertMemory(event.dest_path)

##
## \brief Class to monitor changes in filesystem
## \note Has to use PollingObserver due to network-filesystem.
##       There is no OS-API supporting notification on network-filesystem changes.
##
class LogFileMonitor:

    ###
    ### Public methods
    ###

    def __init__(self, monitorPath):
        self.monitorPath = monitorPath                     # Path to monitor
        self.handler = LogFileEventHandler()               # Handler for events occurred
        self.observer = PollingObserver()                  # Method for monitoring

    def run(self):
        self._start()                                      # Prepare observer
        try:
            while True:
                time.sleep(1)                              # Suspend for some time
        except KeyboardInterrupt:
            self._stop()                                   # Terminate observer

    ###
    ### Private methods
    ###

    def _start(self):
        self.observer.schedule(                            # prepare observer
            event_handler=self.handler,
            path=self.monitorPath,
            recursive=True,
        )
        self.observer.start()                              # start observer

    def _stop(self):
        self.observer.stop()                               # stop observer
        self.observer.join()                               # wait till observer terminated

converter.py

的内容
import time
import concurrent.futures

##
## \brief Class to analyze logger data in memory and write it to influxdb
##
class ConverterMemoryWorker:

    ###
    ### Public methods
    ###

    def __init__(self, logFile):
        self.logFile = logFile

    def run(self):
        time.sleep(30) # Do some long taking stuff here
    
##
## \brief Class to manage converter workers
##
class ConverterManager:

    ###
    ### Public methods
    ###

    def __init__(self):
        print('Created new instance of ConverterManager')

    def convertMemory(self, logFile):
        with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor: # create a new process from pool
            executor.submit(self._task(logFile))                                # start worker-process

    ###
    ### Private methods
    ###

    def _task(self, logFile):
        converterWorker = ConverterMemoryWorker(logFile)
        converterWorker.run()

已解决!

谢谢你们的提示,他们几乎解决了我的问题。 实际上我还需要声明“_task”-method static 以使其最终工作。

这是对我有用的修改后的代码:

import time
import concurrent.futures

##
## \brief Class to analyze logger data in memory and write it to influxdb
##
class ConverterMemoryWorker:

    ###
    ### Public methods
    ###

    def __init__(self, logFile):
        self.logFile = logFile

    def run(self):
        print(f'Started process for {self.logFile}')
        time.sleep(10) # Do some long taking stuff here
        print(f'Terminated process for {self.logFile}')

##
## \brief Class to manage converter workers
##
class ConverterManager:

    executor = None

    ###
    ### Public methods
    ###

    def __init__(self):
        print('Created new instance of ConverterManager')
        self.executor = concurrent.futures.ProcessPoolExecutor(max_workers=4)

    def convertMemory(self, logFile):
        self.executor.submit(self._task, logFile)                                # start worker-process

    ###
    ### Private methods
    ###
    @staticmethod
    def _task(logFile):
        converterWorker = ConverterMemoryWorker(logFile)
        converterWorker.run()


if __name__ == '__main__':
    cm = ConverterManager()

    for i in range(30):
        cm.convertMemory(i)
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
    executor.submit(self._task(logFile))                               

这个 with 语句大致等同于:

executor = concurrent.futures.ProcessPoolExecutor(max_workers=4)
try:
    executor.submit(self._task(logFile))
finally:
    executor.shutdown()

这在documentation for concurrent.futures.Executor.shutdown中有描述。由于每个执行者只有一个任务提交给它,然后它会立即关闭(导致调用者阻塞直到它完成它的工作)所以任务没有机会并发进行。

另一种方法是创建一个由所有任务共享的执行程序,并在程序结束时对其调用 shutdown


与此分开,当你这样做时:

executor.submit(self._task(logFile))

...相当于:

result = self._task(logFile)   # result becomes None
executor.submit(None)

所以它实际上是立即执行工作,甚至没有将其提交给执行者。

您可能想这样做:

executor.submit(self._task, logFile)

...如 concurrent.futures.Executor.submit documentation 中所述。您向它传递您希望子进程调用的函数,以及您希望它调用的所有参数,但重要的是您不要自己调用它。