MessagePack 和日期时间

MessagePack and datetime

我需要一种在 python 多进程之间通过 zeromq 每秒发送 300 条短消息的快速方法。每条消息需要包含 IDtime.time()

msgpack 似乎是在通过 zeromq 发送之前序列化 dict 的最佳方式,方便的是,msgpack 有一个例子正是我需要的,除了它有一个 datetime.datetime.now().

import datetime

import msgpack

useful_dict = {
    "id": 1,
    "created": datetime.datetime.now(),
}

def decode_datetime(obj):
    if b'__datetime__' in obj:
        obj = datetime.datetime.strptime(obj["as_str"], "%Y%m%dT%H:%M:%S.%f")
    return obj

def encode_datetime(obj):
    if isinstance(obj, datetime.datetime):
        return {'__datetime__': True, 'as_str': obj.strftime("%Y%m%dT%H:%M:%S.%f")}
    return obj


packed_dict = msgpack.packb(useful_dict, default=encode_datetime)
this_dict_again = msgpack.unpackb(packed_dict, object_hook=decode_datetime)

问题是他们的示例不起作用,我收到此错误:

    obj = datetime.datetime.strptime(obj["as_str"], "%Y%m%dT%H:%M:%S.%f")
KeyError: 'as_str'

可能因为我在 python 3.4,但我不知道 strptime 有什么问题。非常感谢您的帮助。

Python3 和 Python2 管理不同的字符串编码:encoding-and-decoding-strings-in-python-3-x

那么需要:

  • 使用 b'as_str'(而不是 'as_str')作为字典键
  • 使用encodedecode存储值

像这样修改代码适用于 python2 和 python3 :

import datetime
import msgpack

useful_dict = {
    "id": 1,
    "created": datetime.datetime.now(),
}

def decode_datetime(obj):
    if b'__datetime__' in obj:
        obj = datetime.datetime.strptime(obj[b'as_str'].decode(), "%Y%m%dT%H:%M:%S.%f")
    return obj

def encode_datetime(obj):
    if isinstance(obj, datetime.datetime):
        obj = {'__datetime__': True, 'as_str': obj.strftime("%Y%m%dT%H:%M:%S.%f").encode()}
    return obj


packed_dict = msgpack.packb(useful_dict, default=encode_datetime)
this_dict_again = msgpack.unpackb(packed_dict, object_hook=decode_datetime)

鉴于 messagepack ( import msgpack ) 擅长序列化整数,我创建了一个仅使用整数的解决方案:

_datetime_ExtType = 42

def _unpacker_hook(code, data):
    if code == _datetime_ExtType:
        values = unpack(data)

        if len(values) == 8:  # we have timezone
            return datetime.datetime(*values[:-1], dateutil.tz.tzoffset(None, values[-1]))
        else:
            return datetime.datetime(*values)

    return msgpack.ExtType(code, data)


# This will only get called for unknown types
def _packer_unknown_handler(obj):
    if isinstance(obj, datetime.datetime):
        if obj.tzinfo:
            components = (obj.year, obj.month, obj.day, obj.hour, obj.minute, obj.second, obj.microsecond, int(obj.utcoffset().total_seconds()))
        else:
            components = (obj.year, obj.month, obj.day, obj.hour, obj.minute, obj.second, obj.microsecond)

        # we effectively double pack the values to "compress" them
        data = msgpack.ExtType(_datetime_ExtType, pack(components))
        return data

    raise TypeError("Unknown type: {}".format(obj))

def pack(obj, **kwargs):
    # we don't use a global packer because it wouldn't be re-entrant safe
    return msgpack.packb(obj, use_bin_type=True, default=_packer_unknown_handler, **kwargs)


def unpack(payload):
    try:
        # we temporarily disable gc during unpack to bump up perf: https://pypi.python.org/pypi/msgpack-python
        gc.disable()
        # This must match the above _packer parameters above.  NOTE: use_list is faster
        return msgpack.unpackb(payload, use_list=False, encoding='utf-8', ext_hook=_unpacker_hook)
    finally:
        gc.enable()

documentation中它说packb(和pack)的unicode字符串的默认编码是utf-8。您可以简单地在 decode_datetime 函数中搜索 unicode 字符串 '__datetime__',而不是字节对象 b'__datetime__',然后将 encoding='utf-8' 参数添加到 unpack。像这样:

def decode_datetime(obj):
    if '__datetime__' in obj:
        obj = datetime.datetime.strptime(obj["as_str"], "%Y%m%dT%H:%M:%S.%f")
    return obj

packed_dict = msgpack.packb(useful_dict, default=encode_datetime)
this_dict_again = msgpack.unpackb(packed_dict, object_hook=decode_datetime, encoding='utf-8')