使用 Twisted 记录到 logstash
Logging to logstash with Twisted
我正在尝试使用来自 twisted 的新日志记录 api 将日志发送到 logstash 服务器。
没有扭曲,这里我使用的是:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import division, absolute_import, \
print_function, unicode_literals
import json
import logging
import logstash
logger = logging.getLogger('python-logstash-logger')
logger.setLevel(logging.DEBUG)
logger.addHandler(logstash.TCPLogstashHandler(b'127.0.0.1', 5001, version=1))
logger.addHandler(logging.StreamHandler())
a = dict(a=1, source='testing', _id='Test')
logger.debug(json.dumps(a))
我不明白如何在扭曲的哲学中实现这段代码,我想我应该实现一个将日志转发到 logstash 服务器的观察者,但我找不到关于如何实现这个的任何例子。
我做了一个简单的尝试,但没有用,如果有人能给我指出正确的方向:
from logging.config import dictConfig
from twisted.logger import Logger, STDLibLogObserver
from txacme.application import app
dictConfig(app.settings.get(path='txacme.log'))
l = Logger(observer=STDLibLogObserver('txacme'))
data = dict(a=1, source='coucou', _id='Test')
l.info(data)
l.error('Hello')
以及我使用的配置:
txacme:
log:
version: 1
disable_existing_loggers: False
formatters:
minimal:
format: '[%(asctime)-15s] %(message)s'
simple:
format: '[%(asctime)-15s][%(levelname)s][%(module)s][%(funcName)s] %(message)s'
handlers:
console:
class: logging.StreamHandler
level: !!python/name:logging.DEBUG
formatter: simple
stream: ext://sys.stdout
logstash_tcp: &LOGSTASH
class: logstash.TCPLogstashHandler
level: !!python/name:logging.DEBUG
version: 1
host: 127.0.0.1
port: 5001
message_type: acme
tags: [acme, prod]
loggers:
txacme: &acme
level: !!python/name:logging.DEBUG
handlers: [logstash_tcp]
propagate: no
我还找到了 consumer for RabbitMQ 的示例,但我不确定如何将其移植到 logstash。
我终于创建了一个 protocol/factory 来连接到 logstash 远程服务器,以及一个异步发送数据(即发即弃)的 LogObserver:
代码组织灵感来自twisted.logger
包,可以这样使用:
settings = {
'host': 10.68.0.41,
'port': 5001,
'version': 1}
logstashObserver = LogstashLogObserver(**settings)
__init__.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from ._logstash import LogstashLogObserver
__all__ = (
# From ._logstash
'LogstashLogObserver',
)
_logstash.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import inspect
from twisted import logger
from twisted.internet import defer
from twisted.internet import endpoints
from twisted.internet import protocol
from twisted.internet import reactor
from twisted.internet import task
from twisted.protocols import basic
from zope import interface
from . import _formatter
# use the json event formatter from the twisted logging system
def _formatEvent(event):
l = logger.jsonFileLogObserver(None, u'')
return l.formatEvent(event).strip()
class LogstashClient(basic.LineReceiver):
def connectionMade(self):
try:
self.transport.setTcpKeepAlive(1)
except AttributeError:
pass
def emit(self, event):
self.sendLine(event)
self.transport.loseConnection()
self.factory.eventEmitted(event)
class LogstashFactory(protocol.ReconnectingClientFactory):
protocol = LogstashClient
def __init__(self):
self.clientRequests = []
self.eventRequests = []
self.connected = False
def connectionMade(self, protocol):
ds = self.clientRequests
self.clientRequests = []
for d in ds:
d.callback(protocol)
def eventEmitted(self, event):
ds = self.eventRequests
self.eventRequests = []
for d in ds:
d.callback(event)
# start the logging factory once, it will reconnect automatically
_factory = LogstashFactory()
@interface.implementer(logger.ILogObserver)
class LogstashLogObserver(object):
def __init__(self, host, port=5959, prefix=None, message_type='logstash',
tags=None, fqdn=False, version=0):
self.host = host
self.port = port
if version == 1:
formatter = _formatter.LogstashFormatterVersion1
else:
formatter = _formatter.LogstashFormatterVersion0
self.formatter = formatter(prefix, message_type, tags, fqdn)
def __call__(self, event):
# log_ prefix is the one used by twisted, risk of collision...
# see https://twistedmatrix.com/documents/15.2.1/core/howto/logger.html
event['log_stack'] = inspect.stack()
event = self.formatter.format(event)
d = task.deferLater(reactor, 0, self._connect, reactor)
d.addCallback(lambda client, event: client.emit(event), event)
def _connect(self, reactor=None):
if reactor is None:
from twisted.internet import reactor
endpoint = endpoints.TCP4ClientEndpoint(reactor, self.host, self.port)
return endpoint.connect(_factory)
_formatter.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import logging
import socket
import sys
from datetime import datetime
from twisted import logger
try:
import json
except ImportError:
import simplejson as json
class LogstashBaseFormatter(logging.Formatter):
def __init__(self, prefix=None, message_type='Logstash', tags=None, fqdn=False):
self.prefix = prefix
self.message_type = message_type
self.tags = tags if tags is not None else []
if fqdn:
self.host = socket.getfqdn()
else:
self.host = socket.gethostname()
def get_debug_fields(self, record):
failure = record['log_failure']
try:
traceback = failure.getTraceback()
except Exception:
traceback = u"(UNABLE TO OBTAIN TRACEBACK FROM EVENT)\n"
innermost_frame = failure.frames.pop(0)
fields = {
'type': str(failure.type),
'module': innermost_frame[0],
'file': innermost_frame[1],
'lineno': innermost_frame[2],
'stack': failure.stack,
'parents': failure.parents,
'traceback': traceback,
}
return fields
def get_extra_fields(self, record):
fields = {}
if sys.version_info < (3, 0):
easy_types = (basestring, bool, dict, float, int, long, list, type(None))
else:
easy_types = (str, bool, dict, float, int, list, type(None))
if self.prefix is not None:
for key, value in record:
if not key.startswith(self.prefix): continue
if isinstance(value, easy_types):
fields[key] = value
else:
fields[key] = repr(value)
else:
# get every field that isn't prefixed with log_
for key, value in record.items():
if key.startswith('log_'): continue
if isinstance(value, easy_types):
fields[key] = value
else:
fields[key] = repr(value)
return fields
@classmethod
def format_source(cls, message_type, host, path):
return "%s://%s/%s" % (message_type, host, path)
@classmethod
def format_timestamp(cls, time):
tstamp = datetime.utcfromtimestamp(time)
return tstamp.strftime("%Y-%m-%dT%H:%M:%S") + ".%03d" % (tstamp.microsecond / 1000) + "Z"
@classmethod
def get_namespace(cls, record):
if 'log_namespace' in record:
namespace = record['log_namespace']
elif 'log_logger' in record:
namespace = record['log_logger'].namespace
else:
namespace = '(UNABLE TO OBTAIN THE NAMESPACE)'
return namespace
@classmethod
def serialize(cls, message):
if sys.version_info < (3, 0):
return json.dumps(message)
else:
return bytes(json.dumps(message), 'utf-8')
class LogstashFormatterVersion0(LogstashBaseFormatter):
version = 0
def format(self, record):
# Create message dict
message = {
'@timestamp': self.format_timestamp(record['log_time']),
'@version': self.version,
'message': logger.formatEvent(record),
'@source': self.format_source(self.message_type, self.host,
record['stack'][-1][1]),
'@source_host': self.host,
'@source_path': record['log_stack'][-1][1],
'@tags': self.tags,
'@type': self.message_type,
'@fields': {
'levelname': record['log_level'].name,
'logger': self.get_namespace(record),
},
}
# extra fields
message['@fields'].update(self.get_extra_fields(record))
# exception infos
if 'log_failure' in record:
message['@fields'].update(self.get_debug_fields(record))
return self.serialize(message)
class LogstashFormatterVersion1(LogstashBaseFormatter):
version = 1
def format(self, record):
# Create message dict
message = {
'@timestamp': self.format_timestamp(record['log_time']),
'@version': self.version,
'message': logger.formatEvent(record),
'host': self.host,
'path': record['log_stack'][-1][1],
'tags': self.tags,
'type': self.message_type,
'levelname': record['log_level'].name,
'logger': self.get_namespace(record),
}
# extra fields
message.update(self.get_extra_fields(record))
# exception infos
if 'log_failure' in record:
message.update(self.get_debug_fields(record))
return self.serialize(message)
我正在尝试使用来自 twisted 的新日志记录 api 将日志发送到 logstash 服务器。
没有扭曲,这里我使用的是:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import division, absolute_import, \
print_function, unicode_literals
import json
import logging
import logstash
logger = logging.getLogger('python-logstash-logger')
logger.setLevel(logging.DEBUG)
logger.addHandler(logstash.TCPLogstashHandler(b'127.0.0.1', 5001, version=1))
logger.addHandler(logging.StreamHandler())
a = dict(a=1, source='testing', _id='Test')
logger.debug(json.dumps(a))
我不明白如何在扭曲的哲学中实现这段代码,我想我应该实现一个将日志转发到 logstash 服务器的观察者,但我找不到关于如何实现这个的任何例子。
我做了一个简单的尝试,但没有用,如果有人能给我指出正确的方向:
from logging.config import dictConfig
from twisted.logger import Logger, STDLibLogObserver
from txacme.application import app
dictConfig(app.settings.get(path='txacme.log'))
l = Logger(observer=STDLibLogObserver('txacme'))
data = dict(a=1, source='coucou', _id='Test')
l.info(data)
l.error('Hello')
以及我使用的配置:
txacme:
log:
version: 1
disable_existing_loggers: False
formatters:
minimal:
format: '[%(asctime)-15s] %(message)s'
simple:
format: '[%(asctime)-15s][%(levelname)s][%(module)s][%(funcName)s] %(message)s'
handlers:
console:
class: logging.StreamHandler
level: !!python/name:logging.DEBUG
formatter: simple
stream: ext://sys.stdout
logstash_tcp: &LOGSTASH
class: logstash.TCPLogstashHandler
level: !!python/name:logging.DEBUG
version: 1
host: 127.0.0.1
port: 5001
message_type: acme
tags: [acme, prod]
loggers:
txacme: &acme
level: !!python/name:logging.DEBUG
handlers: [logstash_tcp]
propagate: no
我还找到了 consumer for RabbitMQ 的示例,但我不确定如何将其移植到 logstash。
我终于创建了一个 protocol/factory 来连接到 logstash 远程服务器,以及一个异步发送数据(即发即弃)的 LogObserver:
代码组织灵感来自twisted.logger
包,可以这样使用:
settings = {
'host': 10.68.0.41,
'port': 5001,
'version': 1}
logstashObserver = LogstashLogObserver(**settings)
__init__.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from ._logstash import LogstashLogObserver
__all__ = (
# From ._logstash
'LogstashLogObserver',
)
_logstash.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import inspect
from twisted import logger
from twisted.internet import defer
from twisted.internet import endpoints
from twisted.internet import protocol
from twisted.internet import reactor
from twisted.internet import task
from twisted.protocols import basic
from zope import interface
from . import _formatter
# use the json event formatter from the twisted logging system
def _formatEvent(event):
l = logger.jsonFileLogObserver(None, u'')
return l.formatEvent(event).strip()
class LogstashClient(basic.LineReceiver):
def connectionMade(self):
try:
self.transport.setTcpKeepAlive(1)
except AttributeError:
pass
def emit(self, event):
self.sendLine(event)
self.transport.loseConnection()
self.factory.eventEmitted(event)
class LogstashFactory(protocol.ReconnectingClientFactory):
protocol = LogstashClient
def __init__(self):
self.clientRequests = []
self.eventRequests = []
self.connected = False
def connectionMade(self, protocol):
ds = self.clientRequests
self.clientRequests = []
for d in ds:
d.callback(protocol)
def eventEmitted(self, event):
ds = self.eventRequests
self.eventRequests = []
for d in ds:
d.callback(event)
# start the logging factory once, it will reconnect automatically
_factory = LogstashFactory()
@interface.implementer(logger.ILogObserver)
class LogstashLogObserver(object):
def __init__(self, host, port=5959, prefix=None, message_type='logstash',
tags=None, fqdn=False, version=0):
self.host = host
self.port = port
if version == 1:
formatter = _formatter.LogstashFormatterVersion1
else:
formatter = _formatter.LogstashFormatterVersion0
self.formatter = formatter(prefix, message_type, tags, fqdn)
def __call__(self, event):
# log_ prefix is the one used by twisted, risk of collision...
# see https://twistedmatrix.com/documents/15.2.1/core/howto/logger.html
event['log_stack'] = inspect.stack()
event = self.formatter.format(event)
d = task.deferLater(reactor, 0, self._connect, reactor)
d.addCallback(lambda client, event: client.emit(event), event)
def _connect(self, reactor=None):
if reactor is None:
from twisted.internet import reactor
endpoint = endpoints.TCP4ClientEndpoint(reactor, self.host, self.port)
return endpoint.connect(_factory)
_formatter.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import logging
import socket
import sys
from datetime import datetime
from twisted import logger
try:
import json
except ImportError:
import simplejson as json
class LogstashBaseFormatter(logging.Formatter):
def __init__(self, prefix=None, message_type='Logstash', tags=None, fqdn=False):
self.prefix = prefix
self.message_type = message_type
self.tags = tags if tags is not None else []
if fqdn:
self.host = socket.getfqdn()
else:
self.host = socket.gethostname()
def get_debug_fields(self, record):
failure = record['log_failure']
try:
traceback = failure.getTraceback()
except Exception:
traceback = u"(UNABLE TO OBTAIN TRACEBACK FROM EVENT)\n"
innermost_frame = failure.frames.pop(0)
fields = {
'type': str(failure.type),
'module': innermost_frame[0],
'file': innermost_frame[1],
'lineno': innermost_frame[2],
'stack': failure.stack,
'parents': failure.parents,
'traceback': traceback,
}
return fields
def get_extra_fields(self, record):
fields = {}
if sys.version_info < (3, 0):
easy_types = (basestring, bool, dict, float, int, long, list, type(None))
else:
easy_types = (str, bool, dict, float, int, list, type(None))
if self.prefix is not None:
for key, value in record:
if not key.startswith(self.prefix): continue
if isinstance(value, easy_types):
fields[key] = value
else:
fields[key] = repr(value)
else:
# get every field that isn't prefixed with log_
for key, value in record.items():
if key.startswith('log_'): continue
if isinstance(value, easy_types):
fields[key] = value
else:
fields[key] = repr(value)
return fields
@classmethod
def format_source(cls, message_type, host, path):
return "%s://%s/%s" % (message_type, host, path)
@classmethod
def format_timestamp(cls, time):
tstamp = datetime.utcfromtimestamp(time)
return tstamp.strftime("%Y-%m-%dT%H:%M:%S") + ".%03d" % (tstamp.microsecond / 1000) + "Z"
@classmethod
def get_namespace(cls, record):
if 'log_namespace' in record:
namespace = record['log_namespace']
elif 'log_logger' in record:
namespace = record['log_logger'].namespace
else:
namespace = '(UNABLE TO OBTAIN THE NAMESPACE)'
return namespace
@classmethod
def serialize(cls, message):
if sys.version_info < (3, 0):
return json.dumps(message)
else:
return bytes(json.dumps(message), 'utf-8')
class LogstashFormatterVersion0(LogstashBaseFormatter):
version = 0
def format(self, record):
# Create message dict
message = {
'@timestamp': self.format_timestamp(record['log_time']),
'@version': self.version,
'message': logger.formatEvent(record),
'@source': self.format_source(self.message_type, self.host,
record['stack'][-1][1]),
'@source_host': self.host,
'@source_path': record['log_stack'][-1][1],
'@tags': self.tags,
'@type': self.message_type,
'@fields': {
'levelname': record['log_level'].name,
'logger': self.get_namespace(record),
},
}
# extra fields
message['@fields'].update(self.get_extra_fields(record))
# exception infos
if 'log_failure' in record:
message['@fields'].update(self.get_debug_fields(record))
return self.serialize(message)
class LogstashFormatterVersion1(LogstashBaseFormatter):
version = 1
def format(self, record):
# Create message dict
message = {
'@timestamp': self.format_timestamp(record['log_time']),
'@version': self.version,
'message': logger.formatEvent(record),
'host': self.host,
'path': record['log_stack'][-1][1],
'tags': self.tags,
'type': self.message_type,
'levelname': record['log_level'].name,
'logger': self.get_namespace(record),
}
# extra fields
message.update(self.get_extra_fields(record))
# exception infos
if 'log_failure' in record:
message.update(self.get_debug_fields(record))
return self.serialize(message)