如何缓冲来自多线程函数调用的日志,以便可以记录它们以便函数完成?
How to buffer logs from multithreaded function call so that they can be logged in order the functions finish?
问题
我正在尝试使用 concurrent.futures
库来 运行 "things" 列表中的函数。代码看起来像这样。
import concurrent.futures
import logging
logger = logging.getLogger(__name__)
def process_thing(thing, count):
logger.info(f'starting processing for thing {count}')
# Do some io related stuff
logger.info(f'finished processing for thing {count}')
def process_things_concurrently(things)
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for count, thing in enumerate(things):
futures.append(executor.submit(process_thing, thing, count))
for future in concurrent.futures.as_completed(futures):
future.result()
就像现在的代码一样,日志记录可以按任何顺序发生。
例如:
starting processing for thing 2
starting processing for thing 1
finished processing for thing 2
finished processing for thing 1
我想更改代码,以便缓冲 process_thing()
的特定调用的记录,直到将来完成。
换句话说,特定呼叫的所有记录都粘在一起。这些 'groups' 条记录按通话结束时间排序。
所以从上面的例子来看,上面的日志输出看起来像
starting processing for thing 2
finished processing for thing 2
starting processing for thing 1
finished processing for thing 1
我试过的
我尝试为每个调用创建一个记录器,每个调用都有自己的自定义处理程序,可能是子类化 BufferingHandler。但最终会有很多 "things" 我读到制作很多记录器是不好的。
我对任何有用的东西都持开放态度!谢谢。
这里有一个 DelaydLogger
class 的小秘诀,它将对 logger
方法的所有调用放入一个列表而不是实际执行调用,直到你最终执行 flush
他们都被点燃了。
from functools import partial
class DelayedLogger:
def __init__(self, logger):
self.logger = logger
self._call_stack = [] # list of (method, *args, **kwargs) tuples
self._delayed_methods = {
name : partial(self._delayed_method_proxy, getattr(logger, name))
for name in ["info", "debug", "warning", "error", "critical"]
}
def __getattr__(self, name):
""" Proxy getattr to self.logger, except for self._delayed_methods. """
return self._delayed_methods.get(name, getattr(self.logger, name))
def _delayed_method_proxy(self, method, *args, **kwargs):
self._call_stack.append((method, args, kwargs))
def flush(self):
""" Flush self._call_stack to the real logger. """
for method, args, kwargs in self._call_stack:
method(*args, **kwargs)
self._call_stack = []
在您的示例中,您可以像这样使用它:
import logging
logger = logging.getLogger(__name__)
def process_thing(thing, count):
dlogger = DelayedLogger(logger)
dlogger.info(f'starting processing for thing {count}')
# Do some io related stuff
dlogger.info(f'finished processing for thing {count}')
dlogger.flush()
process_thing(None, 10)
可能有一些方法可以美化它或使其更紧凑,但如果那是您真正想要的,它应该可以完成工作。
首先我修改了@Jeronimo 的回答以提出这个
class DelayedLogger:
class ThreadLogger:
"""to be logged from a single thread"""
def __init__(self, logger):
self._call_stack = [] # list of (method, *args, **kwargs) tuples
self.logger = logger
self._delayed_methods = {
name: partial(self._delayed_method_proxy, getattr(logger, name))
for name in ["info", "debug", "warning", "error", "critical"]
}
def __getattr__(self, name):
""" Proxy getattr to self.logger, except for self._delayed_methods. """
return self._delayed_methods.get(name, getattr(self.logger, name))
def _delayed_method_proxy(self, method, *args, **kwargs):
self._call_stack.append((method, args, kwargs))
def flush(self):
""" Flush self._call_stack to the real logger. """
for method, args, kwargs in self._call_stack:
method(*args, **kwargs)
self._call_stack = []
def __init__(self, logger):
self.logger = logger
self._thread_loggers: typing.Dict[self.ThreadLogger] = {}
def new_thread(self, count):
"""Make a new sub-logger class that writes to the call stack in its slot"""
new_logger = self.ThreadLogger(self.logger)
self._thread_loggers[count] = new_logger
return new_logger
def get_thread(self, count):
return self._thread_loggers[count]
delayed_logger = DelayedLogger(logger)
哪个可以这样用
delayed_logger = DelayedLogger(logger)
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for count, thing in enumerate(things):
futures.append(executor.submit(process_thing,
count,
thing,
logger=delayed_logger.new_thread(count)))
for future in concurrent.futures.as_completed(futures):
count = future.result()
delayed_logger.get_thread(count).flush()
这里的问题是process_thing()
现在需要将记录器作为参数,而记录器的范围是有限的。如果 process_thing()
调用子例程,则它们的日志记录不会延迟。
可能解决办法就是根本不尝试这样做。相反,线程可以创建日志过滤器或其他一些方式来区分它们的消息。
问题
我正在尝试使用 concurrent.futures
库来 运行 "things" 列表中的函数。代码看起来像这样。
import concurrent.futures
import logging
logger = logging.getLogger(__name__)
def process_thing(thing, count):
logger.info(f'starting processing for thing {count}')
# Do some io related stuff
logger.info(f'finished processing for thing {count}')
def process_things_concurrently(things)
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for count, thing in enumerate(things):
futures.append(executor.submit(process_thing, thing, count))
for future in concurrent.futures.as_completed(futures):
future.result()
就像现在的代码一样,日志记录可以按任何顺序发生。
例如:
starting processing for thing 2
starting processing for thing 1
finished processing for thing 2
finished processing for thing 1
我想更改代码,以便缓冲 process_thing()
的特定调用的记录,直到将来完成。
换句话说,特定呼叫的所有记录都粘在一起。这些 'groups' 条记录按通话结束时间排序。
所以从上面的例子来看,上面的日志输出看起来像
starting processing for thing 2
finished processing for thing 2
starting processing for thing 1
finished processing for thing 1
我试过的
我尝试为每个调用创建一个记录器,每个调用都有自己的自定义处理程序,可能是子类化 BufferingHandler。但最终会有很多 "things" 我读到制作很多记录器是不好的。
我对任何有用的东西都持开放态度!谢谢。
这里有一个 DelaydLogger
class 的小秘诀,它将对 logger
方法的所有调用放入一个列表而不是实际执行调用,直到你最终执行 flush
他们都被点燃了。
from functools import partial
class DelayedLogger:
def __init__(self, logger):
self.logger = logger
self._call_stack = [] # list of (method, *args, **kwargs) tuples
self._delayed_methods = {
name : partial(self._delayed_method_proxy, getattr(logger, name))
for name in ["info", "debug", "warning", "error", "critical"]
}
def __getattr__(self, name):
""" Proxy getattr to self.logger, except for self._delayed_methods. """
return self._delayed_methods.get(name, getattr(self.logger, name))
def _delayed_method_proxy(self, method, *args, **kwargs):
self._call_stack.append((method, args, kwargs))
def flush(self):
""" Flush self._call_stack to the real logger. """
for method, args, kwargs in self._call_stack:
method(*args, **kwargs)
self._call_stack = []
在您的示例中,您可以像这样使用它:
import logging
logger = logging.getLogger(__name__)
def process_thing(thing, count):
dlogger = DelayedLogger(logger)
dlogger.info(f'starting processing for thing {count}')
# Do some io related stuff
dlogger.info(f'finished processing for thing {count}')
dlogger.flush()
process_thing(None, 10)
可能有一些方法可以美化它或使其更紧凑,但如果那是您真正想要的,它应该可以完成工作。
首先我修改了@Jeronimo 的回答以提出这个
class DelayedLogger:
class ThreadLogger:
"""to be logged from a single thread"""
def __init__(self, logger):
self._call_stack = [] # list of (method, *args, **kwargs) tuples
self.logger = logger
self._delayed_methods = {
name: partial(self._delayed_method_proxy, getattr(logger, name))
for name in ["info", "debug", "warning", "error", "critical"]
}
def __getattr__(self, name):
""" Proxy getattr to self.logger, except for self._delayed_methods. """
return self._delayed_methods.get(name, getattr(self.logger, name))
def _delayed_method_proxy(self, method, *args, **kwargs):
self._call_stack.append((method, args, kwargs))
def flush(self):
""" Flush self._call_stack to the real logger. """
for method, args, kwargs in self._call_stack:
method(*args, **kwargs)
self._call_stack = []
def __init__(self, logger):
self.logger = logger
self._thread_loggers: typing.Dict[self.ThreadLogger] = {}
def new_thread(self, count):
"""Make a new sub-logger class that writes to the call stack in its slot"""
new_logger = self.ThreadLogger(self.logger)
self._thread_loggers[count] = new_logger
return new_logger
def get_thread(self, count):
return self._thread_loggers[count]
delayed_logger = DelayedLogger(logger)
哪个可以这样用
delayed_logger = DelayedLogger(logger)
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for count, thing in enumerate(things):
futures.append(executor.submit(process_thing,
count,
thing,
logger=delayed_logger.new_thread(count)))
for future in concurrent.futures.as_completed(futures):
count = future.result()
delayed_logger.get_thread(count).flush()
这里的问题是process_thing()
现在需要将记录器作为参数,而记录器的范围是有限的。如果 process_thing()
调用子例程,则它们的日志记录不会延迟。
可能解决办法就是根本不尝试这样做。相反,线程可以创建日志过滤器或其他一些方式来区分它们的消息。