从多个进程使用 CloudLoggingHandler 进行日志记录

logging with CloudLoggingHandler from multiple processes

当日志由多个进程生成时,收集日志并将其发送到 google 云日志记录的首选方式是什么?

这是我基于 CloudLoggingHandler 的建议,你愿意批评它吗?

import google
from multiprocessing import Process
from logging import getLogger

class Worker(Process):
    def __init__(self):
        super(Worker, self).__init__()

    def __setup_logger(self):
        handler = CloudLoggingHandler(google.cloud.logging.Client(), name='log-name')                           
        logger = logging.getLogger()
        logger.setLevel(logging.DEBUG)
        google.cloud.logging.handlers.setup_logging(handler)

    def run(self):
        self.__setup_logger()
        for i in range(10):
            logging.warning("i=%d", i)

if __name__ == "__main__":
    for _ in range(2):
        w = Worker()
        w.start()

我阅读了有关基于队列的日志处理程序的信息 here,但是 CloudLoggingHandler 在隔离线程中使用批量提交,因此基于队列的处理程序可能有点矫枉过正。我说得对吗?

Sources 表示 CloudLoggingHandler 是线程安全的,因此让所有进程共享一个 CloudLoggingHandler 实例可能就足够了。行得通吗?如果是这样是不是太苛刻了?


编辑下方以回答@thomas-schultz。

我坚持我的建议,主要是因为我正在制作原型,它有效 "out of the box",而且我没有检查性能问题。我正在重新考虑这个选择。

确实,据我了解CloudLoggingHandler with BackgroundThreadTransport blocks the main thread until the log is sent to the logging endpoint. This would occur almost for each log line. Indeed, the batch is sent as soon as there is one log record (cf source)。

在我的开发环境中,当多个进程并发记录日志时,会出现一个进程等待长达1秒的日志发送。我想这主要是网络成本,并且会从 google 个数据中心缩减到 "not so much"。

我正在考虑定义一个 StreamHandler which would push all the log record to a Queue. This queue would be read by a Process 来负责将日志发送到日志记录端点。如果相关,此过程可能依赖 CloudLoggingHandler 来执行此操作。

这有意义吗?

我认为这可能有点矫枉过正,除非您 运行 遇到连接问题或某种需要队列的情况。

在那种情况下,您可能会使用 CloudLoggingHandler 的相同实例,但这样做可能会出现一些性能瓶颈。我不太确定。

这里有更多关于与 Python 的标准库记录器集成的信息。 https://googlecloudplatform.github.io/google-cloud-python/stable/logging-usage.html#integration-with-python-logging-module

我很好奇你是否得出了不同的答案?

以下是我计划如何从多个进程登录到 Google Cloud Logging。此解决方案仅使用 python 3 内置日志记录处理程序 (doc)。在下面的示例中,我测量了主进程记录一条消息所花费的时间。结果表明,此解决方案可以避免在将日志发送到日志记录端点时主进程发生阻塞。当然只有当你的耗时任务没有在主进程中完成时才有用。

你觉得这个方法怎么样?

Queue: avg log call duration: 0.00004s
Queue: min log call duration: 0.00002s
Queue: max log call duration: 0.00018s

Cloud: avg log call duration: 0.03019s
Cloud: min log call duration: 0.00003s
Cloud: max log call duration: 0.16630s

下面是综合示例。

import sys
import os
import time
import google
import logging
import multiprocessing

from logging.handlers import QueueHandler, QueueListener
from google.cloud.logging.handlers import CloudLoggingHandler


def do(i):
    """
        Dummy function that times the log insertion.
    """
    t = time.time()
    logging.info('%dth message.' % i)
    return time.time() - t


if __name__ == '__main__':

    # The standard google cloud logging handler sends logs to the clooud logging endpoint.
    client = google.cloud.logging.Client()
    cloud_handler = CloudLoggingHandler(client=client, name="xyz")

    # A local handler is used to have feedbacks on what is going on.
    local_handler = logging.StreamHandler(sys.stdout)

    # Log records are put in the log queue.
    log_queue = multiprocessing.Queue()

    # The listener dequeues log records from the log queue. Each handler registered in the 
    # listener processes the log records.
    queue_listener = QueueListener(log_queue, local_handler, cloud_handler)
    queue_listener.start()

    # The queue handler pushes the log records to the log queue.
    queue_handler = QueueHandler(log_queue)

    # Setup the root loger to the handler we defined.
    root_logger = logging.getLogger()
    root_logger.setLevel(logging.INFO)
    root_logger.addHandler(queue_handler)

    n = 10

    # Emits logs and measure how fast it is with the 
    durations = [do(i) for i in range(n)]
    print('Queue: avg log call duration: %.5fs' % (sum(durations) / n))
    print('Queue: min log call duration: %.5fs' % min(durations))
    print('Queue: max log call duration: %.5fs' % max(durations))

    # Stop the queue listener.
    queue_listener.stop()

    # Remove the queue handler from the root logger.
    root_logger.removeHandler(queue_handler)

    # Setup the root loger to use CloudLoggingHandler.
    root_logger.setLevel(logging.INFO)
    root_logger.addHandler(local_handler)
    root_logger.addHandler(cloud_handler)

    # Emits logs and measure how fast it is with the 
    durations = [do(i) for i in range(n)]
    print('Queue: avg log call duration: %.5fs' % (sum(durations) / n))
    print('Queue: min log call duration: %.5fs' % min(durations))
    print('Queue: max log call duration: %.5fs' % max(durations))