
Fastest way to store and retrieve a large stream of small unstructured messages

我正在开发一个 IOT 应用程序,它需要我处理许多小的非结构化消息(这意味着它们的字段会随着时间的推移而改变——有些会出现,有些会消失)。这些消息通常有 2 到 15 个字段,其值属于基本数据类型(ints/longs、字符串、布尔值)。这些消息非常适合 JSON 数据格式(或 msgpack)。

按到达顺序处理消息至关重要(请理解:它们需要由单个线程处理 - 无法并行处理这部分)。我有自己的实时处理这些消息的逻辑(吞吐量相对较小,每秒最多几十万条消息),但越来越需要引擎能够 simulate/replay 以前的周期重播消息的历史。虽然它最初不是为此目的而编写的,但我的事件处理引擎(用 Go 编写)可以很好地处理每秒数十(可能是几亿)条消息,如果我能够以一定的速度向它提供历史数据的话足够的速度。

这正是问题所在。我已经在很长一段时间(几年)内存储了许多(数千亿)这样的消息,目前以分隔的 msgpack 格式 (https://github.com/msgpack/msgpack-python#streaming-unpacking)。在此设置和其他设置(见下文)中,我能够对 ~2M messages/second 的峰值解析速度进行基准测试(在 2019 Macbook Pro 上,仅解析),这远未使磁盘 IO 饱和。

即使不谈论 IO,也要执行以下操作:

import json
message = {
    'meta1': "measurement",
    'location': "NYC",
    'time': "20200101",
    'value1': 1.0,
    'value2': 2.0,
    'value3': 3.0,
    'value4': 4.0
json_message = json.dumps(message)


给我的解析时间为 3 microseconds/message,略高于 300k messages/second。与 ujson、rapidjson 和 orjson 相比,而不是标准库的 json 模块,我能够获得 1 microsecond/message 的峰值速度(使用 ujson),即大约 1M messages/second.


import msgpack
message = {
    'meta1': "measurement",
    'location': "NYC",
    'time': "20200101",
    'value1': 1.0,
    'value2': 2.0,
    'value3': 3.0,
    'value4': 4.0
msgpack_message = msgpack.packb(message)


给我的处理时间约为 750ns/消息(大约 100ns/字段),即大约 1.3M messages/second。我最初认为 C++ 可以更快。这是一个使用 nlohmann/json 的示例,尽管这不能直接与 msgpack 进行比较:

#include <iostream>
#include "json.hpp"

using json = nlohmann::json;

const std::string message = "{\"value\": \"hello\"}";

int main() {
  auto jsonMessage = json::parse(message);
  for(size_t i=0; i<1000000; ++i) {
    jsonMessage = json::parse(message);
  std::cout << jsonMessage["value"] << std::endl; // To avoid having the compiler optimize the loop away. 

使用 clang 11.0.3 (std=c++17, -O3) 编译,在同一台 Macbook 上运行时间约为 1.4s,也就是说解析速度约为 700k messages/second甚至比 Python 示例更小的消息。我知道 nlohmann/json 可能会很慢,并且能够使用 simdjson 的 DOM API.[=18 获得大约 2M messages/second 的解析速度=]

这对于我的用例来说还是太慢了。我愿意接受所有建议,以提高 Python、C++、Java(或任何 JVM 语言)或 Go 中潜在应用程序的消息解析速度。






  • 避免文本解析(速度慢,因为顺序且充满条件);
  • 避免检查消息是否格式错误(此处不需要,因为它们应该都是格式正确的);
  • 尽可能避免分配;
  • 处​​理消息块。


二进制消息包含其属性的数量(编码为 1 个字节),后跟属性列表。每个属性包含一个字符串,前缀为它的大小(编码为 1 字节),后跟属性类型(std::variant 中类型的索引,编码为 1 字节)以及属性值(大小-前缀字符串,64位整数或64位浮点数)。



#include <unordered_map>
#include <variant>
#include <climits>

// Define the possible types here
using AttrType = std::variant<std::string_view, int64_t, double>;

// Decode the `msgData` buffer and write the decoded message into `result`.
// Assume the message is not ill-formed!
// msgData must not be freed or modified while the resulting map is being used.
void decode(const char* msgData, std::unordered_map<std::string_view, AttrType>& result)
    static_assert(CHAR_BIT == 8);

    const size_t attrCount = msgData[0];
    size_t cur = 1;


    for(size_t i=0 ; i<attrCount ; ++i)
        const size_t keyLen = msgData[cur];
        std::string_view key(msgData+cur+1, keyLen);
        cur += 1 + keyLen;
        const size_t attrType = msgData[cur];

        // A switch could be better if there is more types
        if(attrType == 0) // std::string_view
            const size_t valueLen = msgData[cur];
            std::string_view value(msgData+cur+1, valueLen);
            cur += 1 + valueLen;

            result[key] = std::move(AttrType(value));
        else if(attrType == 1) // Native-endian 64-bit integer
            int64_t value;

            // Required to not break the strict aliasing rule
            std::memcpy(&value, msgData+cur, sizeof(int64_t));
            cur += sizeof(int64_t);

            result[key] = std::move(AttrType(value));
        else // IEEE-754 double
            double value;

            // Required to not break the strict aliasing rule
            std::memcpy(&value, msgData+cur, sizeof(double));
            cur += sizeof(double);

            result[key] = std::move(AttrType(value));


这是一个用法示例(基于您的 json 相关代码):

const char* message = "\x01\x05value\x00\x05hello";

void bench()
    std::unordered_map<std::string_view, AttrType> decodedMsg;

    decode(message, decodedMsg);

    for(size_t i=0; i<1000*1000; ++i)
        decode(message, decodedMsg);

    visit([](const auto& v) { cout << "Result: " << v << endl; }, decodedMsg["value"]);

在我的机器上(配备英特尔 i7-9700KF 处理器)并根据您的基准测试,使用 nlohmann json 库的代码得到 2.7M message/s 和 35.4M message/s 使用新代码。

请注意,此代码可以快得多。事实上,大部分时间都花在了高效的哈希和分配上。您可以通过使用自定义分配器使用更快的散列映射实现(例如 boost::container::flat_map 或 ska::bytell_hash_map)and/or 来缓解该问题。另一种方法是构建您自己精心调整的哈希映射实现。另一种选择是使用键值对向量并使用线性搜索来执行查找(这应该很快,因为您的消息不应该有很多属性,并且因为您说过每条消息需要一小部分属性). 但是,消息越大,解码越慢。因此,您可能需要利用并行性来更快地解码消息块。 综上所述,这有可能达到 100 M 以上 message/s.