存储和检索大量小型非结构化消息的最快方法
Fastest way to store and retrieve a large stream of small unstructured messages
我正在开发一个 IOT 应用程序,它需要我处理许多小的非结构化消息(这意味着它们的字段会随着时间的推移而改变——有些会出现,有些会消失)。这些消息通常有 2 到 15 个字段,其值属于基本数据类型(ints/longs、字符串、布尔值)。这些消息非常适合 JSON 数据格式(或 msgpack)。
按到达顺序处理消息至关重要(请理解:它们需要由单个线程处理 - 无法并行处理这部分)。我有自己的实时处理这些消息的逻辑(吞吐量相对较小,每秒最多几十万条消息),但越来越需要引擎能够 simulate/replay 以前的周期重播消息的历史。虽然它最初不是为此目的而编写的,但我的事件处理引擎(用 Go 编写)可以很好地处理每秒数十(可能是几亿)条消息,如果我能够以一定的速度向它提供历史数据的话足够的速度。
这正是问题所在。我已经在很长一段时间(几年)内存储了许多(数千亿)这样的消息,目前以分隔的 msgpack 格式 (https://github.com/msgpack/msgpack-python#streaming-unpacking)。在此设置和其他设置(见下文)中,我能够对 ~2M messages/second 的峰值解析速度进行基准测试(在 2019 Macbook Pro 上,仅解析),这远未使磁盘 IO 饱和。
即使不谈论 IO,也要执行以下操作:
import json
message = {
'meta1': "measurement",
'location': "NYC",
'time': "20200101",
'value1': 1.0,
'value2': 2.0,
'value3': 3.0,
'value4': 4.0
}
json_message = json.dumps(message)
%%timeit
json.loads(json_message)
给我的解析时间为 3 microseconds/message,略高于 300k messages/second。与 ujson、rapidjson 和 orjson 相比,而不是标准库的 json
模块,我能够获得 1 microsecond/message 的峰值速度(使用 ujson),即大约 1M messages/second.
Msgpack稍微好一点:
import msgpack
message = {
'meta1': "measurement",
'location': "NYC",
'time': "20200101",
'value1': 1.0,
'value2': 2.0,
'value3': 3.0,
'value4': 4.0
}
msgpack_message = msgpack.packb(message)
%%timeit
msgpack.unpackb(msgpack_message)
给我的处理时间约为 750ns/消息(大约 100ns/字段),即大约 1.3M messages/second。我最初认为 C++ 可以更快。这是一个使用 nlohmann/json 的示例,尽管这不能直接与 msgpack 进行比较:
#include <iostream>
#include "json.hpp"
using json = nlohmann::json;
const std::string message = "{\"value\": \"hello\"}";
int main() {
auto jsonMessage = json::parse(message);
for(size_t i=0; i<1000000; ++i) {
jsonMessage = json::parse(message);
}
std::cout << jsonMessage["value"] << std::endl; // To avoid having the compiler optimize the loop away.
};
使用 clang 11.0.3 (std=c++17, -O3) 编译,在同一台 Macbook 上运行时间约为 1.4s,也就是说解析速度约为 700k messages/second甚至比 Python 示例更小的消息。我知道 nlohmann/json 可能会很慢,并且能够使用 simdjson 的 DOM API.[=18 获得大约 2M messages/second 的解析速度=]
这对于我的用例来说还是太慢了。我愿意接受所有建议,以提高 Python、C++、Java(或任何 JVM 语言)或 Go 中潜在应用程序的消息解析速度。
备注:
- 我不一定关心磁盘上消息的大小(如果您建议的存储方法是内存高效的,我会考虑加分)。
- 我只需要一个基本数据类型的键值模型——我不需要嵌套的字典或列表。
- 转换现有数据根本不是问题。我只是在寻找阅读优化的东西。
- 我不一定需要将整个内容解析为结构或自定义对象,仅在需要时访问某些字段(我通常需要每条消息的一小部分字段)- 它如果这带有惩罚,那很好,只要惩罚不会破坏整个应用程序的吞吐量。
- 我对 custom/slightly 不安全的解决方案持开放态度。
- 我选择使用的任何格式都需要自然分隔,因为消息将连续写入一个文件(我目前每天使用一个文件,这对我的用例来说已经足够了)。我过去遇到过错误分隔消息的问题(请参阅 Java Protobuf API 中的 writeDelimitedTo - 丢失一个字节,整个文件都被毁了)。
我已经探索过的东西:
- JSON: 试验过 rapidjson, simdjson, nlohmann/json, 等等...)
- 带分隔符 msgpack 的平面文件(参见 API:https://github.com/msgpack/msgpack-python#streaming-unpacking):我目前用来存储消息的内容。
- 协议缓冲区:稍微快一些,但并不真正适合数据的非结构化性质。
谢谢!!
我假设消息只包含一些基本类型的命名属性(在运行时定义),并且这些基本类型例如字符串、整数和浮点数。
为了快速执行,最好是:
- 避免文本解析(速度慢,因为顺序且充满条件);
- 避免检查消息是否格式错误(此处不需要,因为它们应该都是格式正确的);
- 尽可能避免分配;
- 处理消息块。
因此,我们首先需要设计一个简单快速的二进制消息协议:
二进制消息包含其属性的数量(编码为 1 个字节),后跟属性列表。每个属性包含一个字符串,前缀为它的大小(编码为 1 字节),后跟属性类型(std::variant 中类型的索引,编码为 1 字节)以及属性值(大小-前缀字符串,64位整数或64位浮点数)。
每个编码消息都是一个字节流,可以放入一个大缓冲区(分配一次并重复用于多个传入消息)。
这里是解码来自原始二进制缓冲区的消息的代码:
#include <unordered_map>
#include <variant>
#include <climits>
// Define the possible types here
using AttrType = std::variant<std::string_view, int64_t, double>;
// Decode the `msgData` buffer and write the decoded message into `result`.
// Assume the message is not ill-formed!
// msgData must not be freed or modified while the resulting map is being used.
void decode(const char* msgData, std::unordered_map<std::string_view, AttrType>& result)
{
static_assert(CHAR_BIT == 8);
const size_t attrCount = msgData[0];
size_t cur = 1;
result.clear();
for(size_t i=0 ; i<attrCount ; ++i)
{
const size_t keyLen = msgData[cur];
std::string_view key(msgData+cur+1, keyLen);
cur += 1 + keyLen;
const size_t attrType = msgData[cur];
cur++;
// A switch could be better if there is more types
if(attrType == 0) // std::string_view
{
const size_t valueLen = msgData[cur];
std::string_view value(msgData+cur+1, valueLen);
cur += 1 + valueLen;
result[key] = std::move(AttrType(value));
}
else if(attrType == 1) // Native-endian 64-bit integer
{
int64_t value;
// Required to not break the strict aliasing rule
std::memcpy(&value, msgData+cur, sizeof(int64_t));
cur += sizeof(int64_t);
result[key] = std::move(AttrType(value));
}
else // IEEE-754 double
{
double value;
// Required to not break the strict aliasing rule
std::memcpy(&value, msgData+cur, sizeof(double));
cur += sizeof(double);
result[key] = std::move(AttrType(value));
}
}
}
你可能也需要编写编码函数(基于相同的想法)。
这是一个用法示例(基于您的 json 相关代码):
const char* message = "\x01\x05value\x00\x05hello";
void bench()
{
std::unordered_map<std::string_view, AttrType> decodedMsg;
decodedMsg.reserve(16);
decode(message, decodedMsg);
for(size_t i=0; i<1000*1000; ++i)
{
decode(message, decodedMsg);
}
visit([](const auto& v) { cout << "Result: " << v << endl; }, decodedMsg["value"]);
}
在我的机器上(配备英特尔 i7-9700KF 处理器)并根据您的基准测试,使用 nlohmann json 库的代码得到 2.7M message/s 和 35.4M message/s 使用新代码。
请注意,此代码可以快得多。事实上,大部分时间都花在了高效的哈希和分配上。您可以通过使用自定义分配器使用更快的散列映射实现(例如 boost::container::flat_map 或 ska::bytell_hash_map)and/or 来缓解该问题。另一种方法是构建您自己精心调整的哈希映射实现。另一种选择是使用键值对向量并使用线性搜索来执行查找(这应该很快,因为您的消息不应该有很多属性,并且因为您说过每条消息需要一小部分属性).
但是,消息越大,解码越慢。因此,您可能需要利用并行性来更快地解码消息块。
综上所述,这有可能达到 100 M 以上 message/s.
我正在开发一个 IOT 应用程序,它需要我处理许多小的非结构化消息(这意味着它们的字段会随着时间的推移而改变——有些会出现,有些会消失)。这些消息通常有 2 到 15 个字段,其值属于基本数据类型(ints/longs、字符串、布尔值)。这些消息非常适合 JSON 数据格式(或 msgpack)。
按到达顺序处理消息至关重要(请理解:它们需要由单个线程处理 - 无法并行处理这部分)。我有自己的实时处理这些消息的逻辑(吞吐量相对较小,每秒最多几十万条消息),但越来越需要引擎能够 simulate/replay 以前的周期重播消息的历史。虽然它最初不是为此目的而编写的,但我的事件处理引擎(用 Go 编写)可以很好地处理每秒数十(可能是几亿)条消息,如果我能够以一定的速度向它提供历史数据的话足够的速度。
这正是问题所在。我已经在很长一段时间(几年)内存储了许多(数千亿)这样的消息,目前以分隔的 msgpack 格式 (https://github.com/msgpack/msgpack-python#streaming-unpacking)。在此设置和其他设置(见下文)中,我能够对 ~2M messages/second 的峰值解析速度进行基准测试(在 2019 Macbook Pro 上,仅解析),这远未使磁盘 IO 饱和。
即使不谈论 IO,也要执行以下操作:
import json
message = {
'meta1': "measurement",
'location': "NYC",
'time': "20200101",
'value1': 1.0,
'value2': 2.0,
'value3': 3.0,
'value4': 4.0
}
json_message = json.dumps(message)
%%timeit
json.loads(json_message)
给我的解析时间为 3 microseconds/message,略高于 300k messages/second。与 ujson、rapidjson 和 orjson 相比,而不是标准库的 json
模块,我能够获得 1 microsecond/message 的峰值速度(使用 ujson),即大约 1M messages/second.
Msgpack稍微好一点:
import msgpack
message = {
'meta1': "measurement",
'location': "NYC",
'time': "20200101",
'value1': 1.0,
'value2': 2.0,
'value3': 3.0,
'value4': 4.0
}
msgpack_message = msgpack.packb(message)
%%timeit
msgpack.unpackb(msgpack_message)
给我的处理时间约为 750ns/消息(大约 100ns/字段),即大约 1.3M messages/second。我最初认为 C++ 可以更快。这是一个使用 nlohmann/json 的示例,尽管这不能直接与 msgpack 进行比较:
#include <iostream>
#include "json.hpp"
using json = nlohmann::json;
const std::string message = "{\"value\": \"hello\"}";
int main() {
auto jsonMessage = json::parse(message);
for(size_t i=0; i<1000000; ++i) {
jsonMessage = json::parse(message);
}
std::cout << jsonMessage["value"] << std::endl; // To avoid having the compiler optimize the loop away.
};
使用 clang 11.0.3 (std=c++17, -O3) 编译,在同一台 Macbook 上运行时间约为 1.4s,也就是说解析速度约为 700k messages/second甚至比 Python 示例更小的消息。我知道 nlohmann/json 可能会很慢,并且能够使用 simdjson 的 DOM API.[=18 获得大约 2M messages/second 的解析速度=]
这对于我的用例来说还是太慢了。我愿意接受所有建议,以提高 Python、C++、Java(或任何 JVM 语言)或 Go 中潜在应用程序的消息解析速度。
备注:
- 我不一定关心磁盘上消息的大小(如果您建议的存储方法是内存高效的,我会考虑加分)。
- 我只需要一个基本数据类型的键值模型——我不需要嵌套的字典或列表。
- 转换现有数据根本不是问题。我只是在寻找阅读优化的东西。
- 我不一定需要将整个内容解析为结构或自定义对象,仅在需要时访问某些字段(我通常需要每条消息的一小部分字段)- 它如果这带有惩罚,那很好,只要惩罚不会破坏整个应用程序的吞吐量。
- 我对 custom/slightly 不安全的解决方案持开放态度。
- 我选择使用的任何格式都需要自然分隔,因为消息将连续写入一个文件(我目前每天使用一个文件,这对我的用例来说已经足够了)。我过去遇到过错误分隔消息的问题(请参阅 Java Protobuf API 中的 writeDelimitedTo - 丢失一个字节,整个文件都被毁了)。
我已经探索过的东西:
- JSON: 试验过 rapidjson, simdjson, nlohmann/json, 等等...)
- 带分隔符 msgpack 的平面文件(参见 API:https://github.com/msgpack/msgpack-python#streaming-unpacking):我目前用来存储消息的内容。
- 协议缓冲区:稍微快一些,但并不真正适合数据的非结构化性质。
谢谢!!
我假设消息只包含一些基本类型的命名属性(在运行时定义),并且这些基本类型例如字符串、整数和浮点数。
为了快速执行,最好是:
- 避免文本解析(速度慢,因为顺序且充满条件);
- 避免检查消息是否格式错误(此处不需要,因为它们应该都是格式正确的);
- 尽可能避免分配;
- 处理消息块。
因此,我们首先需要设计一个简单快速的二进制消息协议:
二进制消息包含其属性的数量(编码为 1 个字节),后跟属性列表。每个属性包含一个字符串,前缀为它的大小(编码为 1 字节),后跟属性类型(std::variant 中类型的索引,编码为 1 字节)以及属性值(大小-前缀字符串,64位整数或64位浮点数)。
每个编码消息都是一个字节流,可以放入一个大缓冲区(分配一次并重复用于多个传入消息)。
这里是解码来自原始二进制缓冲区的消息的代码:
#include <unordered_map>
#include <variant>
#include <climits>
// Define the possible types here
using AttrType = std::variant<std::string_view, int64_t, double>;
// Decode the `msgData` buffer and write the decoded message into `result`.
// Assume the message is not ill-formed!
// msgData must not be freed or modified while the resulting map is being used.
void decode(const char* msgData, std::unordered_map<std::string_view, AttrType>& result)
{
static_assert(CHAR_BIT == 8);
const size_t attrCount = msgData[0];
size_t cur = 1;
result.clear();
for(size_t i=0 ; i<attrCount ; ++i)
{
const size_t keyLen = msgData[cur];
std::string_view key(msgData+cur+1, keyLen);
cur += 1 + keyLen;
const size_t attrType = msgData[cur];
cur++;
// A switch could be better if there is more types
if(attrType == 0) // std::string_view
{
const size_t valueLen = msgData[cur];
std::string_view value(msgData+cur+1, valueLen);
cur += 1 + valueLen;
result[key] = std::move(AttrType(value));
}
else if(attrType == 1) // Native-endian 64-bit integer
{
int64_t value;
// Required to not break the strict aliasing rule
std::memcpy(&value, msgData+cur, sizeof(int64_t));
cur += sizeof(int64_t);
result[key] = std::move(AttrType(value));
}
else // IEEE-754 double
{
double value;
// Required to not break the strict aliasing rule
std::memcpy(&value, msgData+cur, sizeof(double));
cur += sizeof(double);
result[key] = std::move(AttrType(value));
}
}
}
你可能也需要编写编码函数(基于相同的想法)。
这是一个用法示例(基于您的 json 相关代码):
const char* message = "\x01\x05value\x00\x05hello";
void bench()
{
std::unordered_map<std::string_view, AttrType> decodedMsg;
decodedMsg.reserve(16);
decode(message, decodedMsg);
for(size_t i=0; i<1000*1000; ++i)
{
decode(message, decodedMsg);
}
visit([](const auto& v) { cout << "Result: " << v << endl; }, decodedMsg["value"]);
}
在我的机器上(配备英特尔 i7-9700KF 处理器)并根据您的基准测试,使用 nlohmann json 库的代码得到 2.7M message/s 和 35.4M message/s 使用新代码。
请注意,此代码可以快得多。事实上,大部分时间都花在了高效的哈希和分配上。您可以通过使用自定义分配器使用更快的散列映射实现(例如 boost::container::flat_map 或 ska::bytell_hash_map)and/or 来缓解该问题。另一种方法是构建您自己精心调整的哈希映射实现。另一种选择是使用键值对向量并使用线性搜索来执行查找(这应该很快,因为您的消息不应该有很多属性,并且因为您说过每条消息需要一小部分属性). 但是,消息越大,解码越慢。因此,您可能需要利用并行性来更快地解码消息块。 综上所述,这有可能达到 100 M 以上 message/s.