相当于 contextmanager=>yield stdout 的日志记录上下文管理器
Context manager for logging equivalent to contextmanager=>yield stdout
我正在使用的库采用如下函数:
@contextlib.contextmanager
def stdout_cm():
yield sys.stdout
现在,而不是 foo(stdout_cm)
,我如何通过 tee
的 logging
实例发送给多个处理程序? (标准输出和 StringIO)
尝试:
from sys import stdout, stderr
from cStringIO import StringIO
from logging import getLogger, basicConfig, StreamHandler
from subprocess import check_call
basicConfig(format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
datefmt='%m-%d %H:%M', level='INFO')
log = getLogger(__name__)
console = StreamHandler(StringIO())
log.addHandler(console)
log.addHandler(StreamHandler(stdout))
class LoggerContext(object):
def __init__(self, logger, level=None, close=True):
self.logger = logger
self.level = level
self.close = close
def __call__(self):
return self
def __enter__(self):
if self.level is not None:
self.old_level = self.logger.level
self.logger.setLevel(self.level)
def __exit__(self, et, ev, tb):
if not self.close:
return
for handler in self.logger.handlers:
handler.close()
简化函数:
def foo(f):
with f() as fh:
check_call(["printf '\n\n' | wc -l"], shell=True, stderr=fh, stdout=fh)
foo(LoggerContext(log)) #, level='WARN')
print console.stream.getvalue()
序曲:
import os
import logging
import subprocess
from io import IOBase
from sys import stdout
from select import select
from threading import Thread
from time import sleep
log = logging.getLogger(__name__)
Class(基于):
class StreamLogger(IOBase):
_run = None
def __init__(self, logger_obj, level):
super(StreamLogger, self).__init__()
self.logger_obj = logger_obj
self.level = level
self.pipe = os.pipe()
self.thread = Thread(target=self._flusher)
self.thread.start()
def __call__(self): return self
def _flusher(self):
self._run = True
buf = b''
while self._run:
for fh in select([self.pipe[0]], [], [], 1)[0]:
buf += os.read(fh, 1024)
while b'\n' in buf:
data, buf = buf.split(b'\n', 1)
self.write(data.decode())
sleep(1)
self._run = None
def write(self, data): return self.logger_obj.log(self.level, data)
def fileno(self): return self.pipe[1]
def close(self):
if self._run:
self._run = False
while self._run is not None:
sleep(1)
os.close(self.pipe[0])
os.close(self.pipe[1])
用法:
with StreamLogger(log, logging.INFO) as out, StreamLogger(log,
logging.ERROR) as err:
subprocess.Popen('ls 1>&2', stderr=err, shell=True)
我正在使用的库采用如下函数:
@contextlib.contextmanager
def stdout_cm():
yield sys.stdout
现在,而不是 foo(stdout_cm)
,我如何通过 tee
的 logging
实例发送给多个处理程序? (标准输出和 StringIO)
尝试:
from sys import stdout, stderr
from cStringIO import StringIO
from logging import getLogger, basicConfig, StreamHandler
from subprocess import check_call
basicConfig(format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
datefmt='%m-%d %H:%M', level='INFO')
log = getLogger(__name__)
console = StreamHandler(StringIO())
log.addHandler(console)
log.addHandler(StreamHandler(stdout))
class LoggerContext(object):
def __init__(self, logger, level=None, close=True):
self.logger = logger
self.level = level
self.close = close
def __call__(self):
return self
def __enter__(self):
if self.level is not None:
self.old_level = self.logger.level
self.logger.setLevel(self.level)
def __exit__(self, et, ev, tb):
if not self.close:
return
for handler in self.logger.handlers:
handler.close()
简化函数:
def foo(f):
with f() as fh:
check_call(["printf '\n\n' | wc -l"], shell=True, stderr=fh, stdout=fh)
foo(LoggerContext(log)) #, level='WARN')
print console.stream.getvalue()
序曲:
import os
import logging
import subprocess
from io import IOBase
from sys import stdout
from select import select
from threading import Thread
from time import sleep
log = logging.getLogger(__name__)
Class(基于):
class StreamLogger(IOBase):
_run = None
def __init__(self, logger_obj, level):
super(StreamLogger, self).__init__()
self.logger_obj = logger_obj
self.level = level
self.pipe = os.pipe()
self.thread = Thread(target=self._flusher)
self.thread.start()
def __call__(self): return self
def _flusher(self):
self._run = True
buf = b''
while self._run:
for fh in select([self.pipe[0]], [], [], 1)[0]:
buf += os.read(fh, 1024)
while b'\n' in buf:
data, buf = buf.split(b'\n', 1)
self.write(data.decode())
sleep(1)
self._run = None
def write(self, data): return self.logger_obj.log(self.level, data)
def fileno(self): return self.pipe[1]
def close(self):
if self._run:
self._run = False
while self._run is not None:
sleep(1)
os.close(self.pipe[0])
os.close(self.pipe[1])
用法:
with StreamLogger(log, logging.INFO) as out, StreamLogger(log,
logging.ERROR) as err:
subprocess.Popen('ls 1>&2', stderr=err, shell=True)