Python:节省字节以有效地记录时间戳

Python: Save bytes to log with timestamp efficiently

我每 2 秒收到一个 UDP 包,我用套接字读取它,然后将 UDP 数据提取到位数组。

位数组中的每一位对应于特定条件的 true/false 值。它们对应的条件是静态的。

我想将每个 UDP 包中的数据存储在硬盘上,以便稍后读取这些数据并及时返回查看每个条件的 true/false 状态。我一直在考虑使用 logging 模块来做这件事,但它似乎不是很有效地存储这种数据。我可以存储几乎不占用任何 space 的字节。我基本上有 60 个字节和一个时间戳,可以在很长一段时间内每 2 秒存储一次。

我希望能够将这些数据存储在一个星期的文件中。

我应该为此使用日志记录模块还是有更有效的方法我还没有找到。如果有人能给我指出一个好的图书馆,那就太好了!

此致,

贝伦德

我还没有找到这样做的图书馆,但你可以使用我为 . The data is being saved in pickled state 编写的小型 FileBuffer class,带来更少磁盘的优势 space 已采取,尽管数据不再是人类可读的。但是由于您的数据是表示布尔值数组的二进制字符串,所以这可能不是问题:

#!/usr/bin/env python3
# coding: utf-8

import os
import pickle
import time

class BinaryFileBuffer:
    def __init__(self, basename):
        self.basename = basename

    @property
    def fname(self):
    # call other function for better inherit-and-overwrite-behaviour
        return self.get_filename()

    def get_filename(self):
        return '{}.bin'.format(self.basename)

    def write(self, data):
        with open(self.fname, 'ba') as f:
            pickle.dump(data, f)

    def write_ts(self, data):
        self.write((time.time(), data))

    @property
    def size(self):
        return int(os.path.isfile(self.fname) \
                and os.path.getsize(self.fname))

    def __iter__(self):
        if self.size > 0:
            try:
                with open(self.fname, 'br') as f:
                    while True:
                        yield pickle.load(f)
            except EOFError:
                return

    def flush(self):
        if self.size > 0:
            os.remove(self.fname)

这种数据表示形式的开销非常小,当使用时间戳保存 60 字节时,您将在磁盘上获得 82 字节。假设一周内每两秒 60 字节,您最终将得到一个 23.6MB 的文件,用于 17.3MB 的原始数据。

要让此文件 "rotate",只需修改文件名以包含当前年份和星期:

class RotatingBinaryFileBuffer(BinaryFileBuffer):
    def get_filename(self):
        # append year and week number to basename
        return '{}_{}.bin'.format( \
                 self.basename, time.strftime('%Yw%W'))

可能的测试用例如下:

from random import randint
def randstr(strlen):
    # include only printable characters
    return ''.join(chr(randint(32, 126)) for i in range(strlen))
buf = RotatingBinaryFileBuffer('data')
buf.write_ts(randstr(60))
for item in buf:
    print(item)
buflen = max(i for i,_ in enumerate(buf)) + 1
print("\nBytes per entry: {}".format(buf.size/buflen))

这在某种程度上是 "quick'n'dirty" 自制的解决方案,但也许您对它有某种用途。