使用 Twisted 记录到 logstash

Logging to logstash with Twisted

我正在尝试使用来自 twisted 的新日志记录 api 将日志发送到 logstash 服务器。

没有扭曲,这里我使用的是:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from __future__ import division, absolute_import, \
    print_function, unicode_literals

import json
import logging
import logstash

logger = logging.getLogger('python-logstash-logger')
logger.setLevel(logging.DEBUG)
logger.addHandler(logstash.TCPLogstashHandler(b'127.0.0.1', 5001, version=1))
logger.addHandler(logging.StreamHandler())
a = dict(a=1, source='testing', _id='Test')
logger.debug(json.dumps(a))

我不明白如何在扭曲的哲学中实现这段代码,我想我应该实现一个将日志转发到 logstash 服务器的观察者,但我找不到关于如何实现这个的任何例子。

我做了一个简单的尝试,但没有用,如果有人能给我指出正确的方向:

from logging.config import dictConfig
from twisted.logger import Logger, STDLibLogObserver

from txacme.application import app

dictConfig(app.settings.get(path='txacme.log'))

l = Logger(observer=STDLibLogObserver('txacme'))
data = dict(a=1, source='coucou', _id='Test')
l.info(data)
l.error('Hello')

以及我使用的配置:

txacme:
    log:
        version: 1
        disable_existing_loggers: False

        formatters:
          minimal:
            format: '[%(asctime)-15s] %(message)s'
          simple:
            format: '[%(asctime)-15s][%(levelname)s][%(module)s][%(funcName)s] %(message)s'

        handlers:
          console:
            class: logging.StreamHandler
            level: !!python/name:logging.DEBUG
            formatter: simple
            stream: ext://sys.stdout

          logstash_tcp: &LOGSTASH
            class: logstash.TCPLogstashHandler
            level: !!python/name:logging.DEBUG
            version: 1
            host: 127.0.0.1
            port: 5001
            message_type: acme
            tags: [acme, prod]

        loggers:
          txacme: &acme
            level: !!python/name:logging.DEBUG
            handlers: [logstash_tcp]
            propagate: no

我还找到了 consumer for RabbitMQ 的示例,但我不确定如何将其移植到 logstash。

我终于创建了一个 protocol/factory 来连接到 logstash 远程服务器,以及一个异步发送数据(即发即弃)的 LogObserver:

代码组织灵感来自twisted.logger包,可以这样使用:

settings = {
    'host': 10.68.0.41,
    'port': 5001,
    'version': 1}
logstashObserver = LogstashLogObserver(**settings)

__init__.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from ._logstash import LogstashLogObserver

__all__ = (
    # From ._logstash
    'LogstashLogObserver',
)

_logstash.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import inspect

from twisted import logger
from twisted.internet import defer
from twisted.internet import endpoints
from twisted.internet import protocol
from twisted.internet import reactor
from twisted.internet import task
from twisted.protocols import basic
from zope import interface

from . import _formatter


# use the json event formatter from the twisted logging system
def _formatEvent(event):
    l = logger.jsonFileLogObserver(None, u'')
    return l.formatEvent(event).strip()


class LogstashClient(basic.LineReceiver):
    def connectionMade(self):
        try:
            self.transport.setTcpKeepAlive(1)
        except AttributeError:
            pass

    def emit(self, event):
        self.sendLine(event)
        self.transport.loseConnection()
        self.factory.eventEmitted(event)


class LogstashFactory(protocol.ReconnectingClientFactory):
    protocol = LogstashClient

    def __init__(self):
        self.clientRequests = []
        self.eventRequests = []
        self.connected = False

    def connectionMade(self, protocol):
        ds = self.clientRequests
        self.clientRequests = []
        for d in ds:
            d.callback(protocol)

    def eventEmitted(self, event):
        ds = self.eventRequests
        self.eventRequests = []
        for d in ds:
            d.callback(event)


# start the logging factory once, it will reconnect automatically
_factory = LogstashFactory()


@interface.implementer(logger.ILogObserver)
class LogstashLogObserver(object):
    def __init__(self, host, port=5959, prefix=None, message_type='logstash',
                 tags=None, fqdn=False, version=0):
        self.host = host
        self.port = port
        if version == 1:
            formatter = _formatter.LogstashFormatterVersion1
        else:
            formatter = _formatter.LogstashFormatterVersion0
        self.formatter = formatter(prefix, message_type, tags, fqdn)

    def __call__(self, event):
        # log_ prefix is the one used by twisted, risk of collision...
        # see https://twistedmatrix.com/documents/15.2.1/core/howto/logger.html
        event['log_stack'] = inspect.stack()
        event = self.formatter.format(event)
        d = task.deferLater(reactor, 0, self._connect, reactor)
        d.addCallback(lambda client, event: client.emit(event), event)

    def _connect(self, reactor=None):
        if reactor is None:
            from twisted.internet import reactor

        endpoint = endpoints.TCP4ClientEndpoint(reactor, self.host, self.port)
        return endpoint.connect(_factory)

_formatter.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function


import logging
import socket
import sys
from datetime import datetime

from twisted import logger

try:
    import json
except ImportError:
    import simplejson as json


class LogstashBaseFormatter(logging.Formatter):
    def __init__(self, prefix=None, message_type='Logstash', tags=None, fqdn=False):
        self.prefix = prefix
        self.message_type = message_type
        self.tags = tags if tags is not None else []

        if fqdn:
            self.host = socket.getfqdn()
        else:
            self.host = socket.gethostname()

    def get_debug_fields(self, record):
        failure = record['log_failure']

        try:
            traceback = failure.getTraceback()
        except Exception:
            traceback = u"(UNABLE TO OBTAIN TRACEBACK FROM EVENT)\n"

        innermost_frame = failure.frames.pop(0)
        fields = {
            'type': str(failure.type),
            'module': innermost_frame[0],
            'file': innermost_frame[1],
            'lineno': innermost_frame[2],
            'stack': failure.stack,
            'parents': failure.parents,
            'traceback': traceback,
        }

        return fields

    def get_extra_fields(self, record):
        fields = {}
        if sys.version_info < (3, 0):
            easy_types = (basestring, bool, dict, float, int, long, list, type(None))
        else:
            easy_types = (str, bool, dict, float, int, list, type(None))

        if self.prefix is not None:
            for key, value in record:
                if not key.startswith(self.prefix): continue
                if isinstance(value, easy_types):
                    fields[key] = value
                else:
                    fields[key] = repr(value)
        else:

            # get every field that isn't prefixed with log_
            for key, value in record.items():
                if key.startswith('log_'): continue
                if isinstance(value, easy_types):
                    fields[key] = value
                else:
                    fields[key] = repr(value)

        return fields

    @classmethod
    def format_source(cls, message_type, host, path):
        return "%s://%s/%s" % (message_type, host, path)

    @classmethod
    def format_timestamp(cls, time):
        tstamp = datetime.utcfromtimestamp(time)
        return tstamp.strftime("%Y-%m-%dT%H:%M:%S") + ".%03d" % (tstamp.microsecond / 1000) + "Z"

    @classmethod
    def get_namespace(cls, record):
        if 'log_namespace' in record:
            namespace = record['log_namespace']
        elif 'log_logger' in record:
            namespace = record['log_logger'].namespace
        else:
            namespace = '(UNABLE TO OBTAIN THE NAMESPACE)'
        return namespace

    @classmethod
    def serialize(cls, message):
        if sys.version_info < (3, 0):
            return json.dumps(message)
        else:
            return bytes(json.dumps(message), 'utf-8')


class LogstashFormatterVersion0(LogstashBaseFormatter):
    version = 0

    def format(self, record):
        # Create message dict
        message = {
            '@timestamp': self.format_timestamp(record['log_time']),
            '@version': self.version,
            'message': logger.formatEvent(record),
            '@source': self.format_source(self.message_type, self.host,
                                          record['stack'][-1][1]),
            '@source_host': self.host,
            '@source_path': record['log_stack'][-1][1],
            '@tags': self.tags,
            '@type': self.message_type,
            '@fields': {
                'levelname': record['log_level'].name,
                'logger': self.get_namespace(record),
            },
        }

        # extra fields
        message['@fields'].update(self.get_extra_fields(record))

        # exception infos
        if 'log_failure' in record:
            message['@fields'].update(self.get_debug_fields(record))

        return self.serialize(message)


class LogstashFormatterVersion1(LogstashBaseFormatter):
    version = 1

    def format(self, record):
        # Create message dict
        message = {
            '@timestamp': self.format_timestamp(record['log_time']),
            '@version': self.version,
            'message': logger.formatEvent(record),
            'host': self.host,
            'path': record['log_stack'][-1][1],
            'tags': self.tags,
            'type': self.message_type,
            'levelname': record['log_level'].name,
            'logger': self.get_namespace(record),
        }

        # extra fields
        message.update(self.get_extra_fields(record))

        # exception infos
        if 'log_failure' in record:
            message.update(self.get_debug_fields(record))

        return self.serialize(message)