MessagePack 和日期时间
MessagePack and datetime
我需要一种在 python 多进程之间通过 zeromq
每秒发送 300 条短消息的快速方法。每条消息需要包含 ID
和 time.time()
msgpack
似乎是在通过 zeromq
发送之前序列化 dict 的最佳方式,方便的是,msgpack
有一个例子正是我需要的,除了它有一个 datetime.datetime.now()
.
import datetime
import msgpack
useful_dict = {
"id": 1,
"created": datetime.datetime.now(),
}
def decode_datetime(obj):
if b'__datetime__' in obj:
obj = datetime.datetime.strptime(obj["as_str"], "%Y%m%dT%H:%M:%S.%f")
return obj
def encode_datetime(obj):
if isinstance(obj, datetime.datetime):
return {'__datetime__': True, 'as_str': obj.strftime("%Y%m%dT%H:%M:%S.%f")}
return obj
packed_dict = msgpack.packb(useful_dict, default=encode_datetime)
this_dict_again = msgpack.unpackb(packed_dict, object_hook=decode_datetime)
问题是他们的示例不起作用,我收到此错误:
obj = datetime.datetime.strptime(obj["as_str"], "%Y%m%dT%H:%M:%S.%f")
KeyError: 'as_str'
可能因为我在 python 3.4,但我不知道 strptime 有什么问题。非常感谢您的帮助。
Python3 和 Python2 管理不同的字符串编码:encoding-and-decoding-strings-in-python-3-x
那么需要:
- 使用
b'as_str'
(而不是 'as_str'
)作为字典键
- 使用
encode
和decode
存储值
像这样修改代码适用于 python2 和 python3 :
import datetime
import msgpack
useful_dict = {
"id": 1,
"created": datetime.datetime.now(),
}
def decode_datetime(obj):
if b'__datetime__' in obj:
obj = datetime.datetime.strptime(obj[b'as_str'].decode(), "%Y%m%dT%H:%M:%S.%f")
return obj
def encode_datetime(obj):
if isinstance(obj, datetime.datetime):
obj = {'__datetime__': True, 'as_str': obj.strftime("%Y%m%dT%H:%M:%S.%f").encode()}
return obj
packed_dict = msgpack.packb(useful_dict, default=encode_datetime)
this_dict_again = msgpack.unpackb(packed_dict, object_hook=decode_datetime)
鉴于 messagepack ( import msgpack
) 擅长序列化整数,我创建了一个仅使用整数的解决方案:
_datetime_ExtType = 42
def _unpacker_hook(code, data):
if code == _datetime_ExtType:
values = unpack(data)
if len(values) == 8: # we have timezone
return datetime.datetime(*values[:-1], dateutil.tz.tzoffset(None, values[-1]))
else:
return datetime.datetime(*values)
return msgpack.ExtType(code, data)
# This will only get called for unknown types
def _packer_unknown_handler(obj):
if isinstance(obj, datetime.datetime):
if obj.tzinfo:
components = (obj.year, obj.month, obj.day, obj.hour, obj.minute, obj.second, obj.microsecond, int(obj.utcoffset().total_seconds()))
else:
components = (obj.year, obj.month, obj.day, obj.hour, obj.minute, obj.second, obj.microsecond)
# we effectively double pack the values to "compress" them
data = msgpack.ExtType(_datetime_ExtType, pack(components))
return data
raise TypeError("Unknown type: {}".format(obj))
def pack(obj, **kwargs):
# we don't use a global packer because it wouldn't be re-entrant safe
return msgpack.packb(obj, use_bin_type=True, default=_packer_unknown_handler, **kwargs)
def unpack(payload):
try:
# we temporarily disable gc during unpack to bump up perf: https://pypi.python.org/pypi/msgpack-python
gc.disable()
# This must match the above _packer parameters above. NOTE: use_list is faster
return msgpack.unpackb(payload, use_list=False, encoding='utf-8', ext_hook=_unpacker_hook)
finally:
gc.enable()
在documentation中它说packb
(和pack
)的unicode字符串的默认编码是utf-8。您可以简单地在 decode_datetime
函数中搜索 unicode 字符串 '__datetime__'
,而不是字节对象 b'__datetime__'
,然后将 encoding='utf-8'
参数添加到 unpack
。像这样:
def decode_datetime(obj):
if '__datetime__' in obj:
obj = datetime.datetime.strptime(obj["as_str"], "%Y%m%dT%H:%M:%S.%f")
return obj
packed_dict = msgpack.packb(useful_dict, default=encode_datetime)
this_dict_again = msgpack.unpackb(packed_dict, object_hook=decode_datetime, encoding='utf-8')
我需要一种在 python 多进程之间通过 zeromq
每秒发送 300 条短消息的快速方法。每条消息需要包含 ID
和 time.time()
msgpack
似乎是在通过 zeromq
发送之前序列化 dict 的最佳方式,方便的是,msgpack
有一个例子正是我需要的,除了它有一个 datetime.datetime.now()
.
import datetime
import msgpack
useful_dict = {
"id": 1,
"created": datetime.datetime.now(),
}
def decode_datetime(obj):
if b'__datetime__' in obj:
obj = datetime.datetime.strptime(obj["as_str"], "%Y%m%dT%H:%M:%S.%f")
return obj
def encode_datetime(obj):
if isinstance(obj, datetime.datetime):
return {'__datetime__': True, 'as_str': obj.strftime("%Y%m%dT%H:%M:%S.%f")}
return obj
packed_dict = msgpack.packb(useful_dict, default=encode_datetime)
this_dict_again = msgpack.unpackb(packed_dict, object_hook=decode_datetime)
问题是他们的示例不起作用,我收到此错误:
obj = datetime.datetime.strptime(obj["as_str"], "%Y%m%dT%H:%M:%S.%f")
KeyError: 'as_str'
可能因为我在 python 3.4,但我不知道 strptime 有什么问题。非常感谢您的帮助。
Python3 和 Python2 管理不同的字符串编码:encoding-and-decoding-strings-in-python-3-x
那么需要:
- 使用
b'as_str'
(而不是'as_str'
)作为字典键 - 使用
encode
和decode
存储值
像这样修改代码适用于 python2 和 python3 :
import datetime
import msgpack
useful_dict = {
"id": 1,
"created": datetime.datetime.now(),
}
def decode_datetime(obj):
if b'__datetime__' in obj:
obj = datetime.datetime.strptime(obj[b'as_str'].decode(), "%Y%m%dT%H:%M:%S.%f")
return obj
def encode_datetime(obj):
if isinstance(obj, datetime.datetime):
obj = {'__datetime__': True, 'as_str': obj.strftime("%Y%m%dT%H:%M:%S.%f").encode()}
return obj
packed_dict = msgpack.packb(useful_dict, default=encode_datetime)
this_dict_again = msgpack.unpackb(packed_dict, object_hook=decode_datetime)
鉴于 messagepack ( import msgpack
) 擅长序列化整数,我创建了一个仅使用整数的解决方案:
_datetime_ExtType = 42
def _unpacker_hook(code, data):
if code == _datetime_ExtType:
values = unpack(data)
if len(values) == 8: # we have timezone
return datetime.datetime(*values[:-1], dateutil.tz.tzoffset(None, values[-1]))
else:
return datetime.datetime(*values)
return msgpack.ExtType(code, data)
# This will only get called for unknown types
def _packer_unknown_handler(obj):
if isinstance(obj, datetime.datetime):
if obj.tzinfo:
components = (obj.year, obj.month, obj.day, obj.hour, obj.minute, obj.second, obj.microsecond, int(obj.utcoffset().total_seconds()))
else:
components = (obj.year, obj.month, obj.day, obj.hour, obj.minute, obj.second, obj.microsecond)
# we effectively double pack the values to "compress" them
data = msgpack.ExtType(_datetime_ExtType, pack(components))
return data
raise TypeError("Unknown type: {}".format(obj))
def pack(obj, **kwargs):
# we don't use a global packer because it wouldn't be re-entrant safe
return msgpack.packb(obj, use_bin_type=True, default=_packer_unknown_handler, **kwargs)
def unpack(payload):
try:
# we temporarily disable gc during unpack to bump up perf: https://pypi.python.org/pypi/msgpack-python
gc.disable()
# This must match the above _packer parameters above. NOTE: use_list is faster
return msgpack.unpackb(payload, use_list=False, encoding='utf-8', ext_hook=_unpacker_hook)
finally:
gc.enable()
在documentation中它说packb
(和pack
)的unicode字符串的默认编码是utf-8。您可以简单地在 decode_datetime
函数中搜索 unicode 字符串 '__datetime__'
,而不是字节对象 b'__datetime__'
,然后将 encoding='utf-8'
参数添加到 unpack
。像这样:
def decode_datetime(obj):
if '__datetime__' in obj:
obj = datetime.datetime.strptime(obj["as_str"], "%Y%m%dT%H:%M:%S.%f")
return obj
packed_dict = msgpack.packb(useful_dict, default=encode_datetime)
this_dict_again = msgpack.unpackb(packed_dict, object_hook=decode_datetime, encoding='utf-8')