boost::asio::async_read_until 自定义 match_char 仅接受 JSON 格式

boost::asio::async_read_until with custom match_char to accept only JSON format

我一直在尝试更改 match_char 函数以在从套接字读取数据时仅接受 JSON 消息。

我有 2 个实现(一个不起作用,另一个起作用,但我认为它效率不高)。

1- 第一种方法(有效)

    typedef boost::asio::buffers_iterator<boost::asio::streambuf::const_buffers_type> buffer_iterator;

    static std::pair<buffer_iterator, bool> match_json2(const buffer_iterator begin,
                                                            const buffer_iterator end) {
        buffer_iterator i = begin;
        while (i != end) {
            if ((*i == ']') || (*i == '}')) {
                return std::make_pair(i, true);
            }
            *i++;
        }
        return std::make_pair(i, false);
    }

根据这个定义,我循环读取并重建json。这是一个工作版本,但如果我收到一条与有效 json 不同的消息,我会留在循环中,无法清除 tmp_response 并且永远不会从中恢复...

        std::string read_buffer_string() {
            std::string response;
            bool keepReading = true;
            while (keepReading) {
                std::string tmp_response;
                async_read_until(s, ba::dynamic_buffer(tmp_response), match_json2, yc);
                if (!tmp_response.empty()) {
                    response += tmp_response;
                    if (nlohmann::json::accept(response)) {
                        keepReading = false;
                    }
                }
            }
            return response;
        }
  1. 第二种方法(无效)。理想情况下我想要这样的东西(这个实现不起作用,因为开始迭代器并不总是指向消息的开头 - 我猜一些数据已经被传输到缓冲区 - 因此 match_json returns 无效值。

     static std::pair<buffer_iterator, bool> match_json(const buffer_iterator begin,
                                                             const buffer_iterator end) {
         buffer_iterator i = begin;
         while (i != end) {
             if ((*i == ']') || (*i == '}')) {
                 std::string _message(begin, i);
                 std::cout << _message << std::endl;
                 if (nlohmann::json::accept(_message)) {
                     return std::make_pair(i, true);
                 }
             }
             *i++;
         }
         return std::make_pair(i, false);
     }
    

然后这样称呼它:

        std::string read_buffer_string() {
            std::string response;
            async_read_until(s, ba::dynamic_buffer(response), match_json, yc);
            return response;
        }

现在有人有更有效的方法吗? 提前致谢! :)

TL/DR;

Seriously, just add framing to your wire protocol. E.g. even HTTP responses do this (e.g. via the content length headers, and maybe chunked encoding)

UPDATE:

Instead of handrolling you can go with Boost JSON as I added in


第一种方法有缺陷,因为您正在使用“async_read_until”,但将操作视为同步操作。

第二个问题是,json::parsejson::accept 都不能报告 complete/broken 解析的位置。这意味着您确实需要在有线协议中进行分帧,因为您无法检测消息边界。

此答案的其余部分将首先深入探讨 nlohmann::json 库的局限性如何使您无法完成任务¹。

因此,即使您使用现有库值得称赞,我们也会寻找替代方案。

让它发挥作用(?)

您可以使用 Beast 使用的方法 (http::read(s, buf, http::message<>)。即:拥有对整个缓冲区的引用。

flat_buffer buf;
http::request<http::empty_body> m;
read(s, buf, m); // is a SyncStream like socket

在这里,读取是对消息和缓冲区的组合操作。这使得检查完成标准变得容易。在我们的例子中,让我们创建一个 reader 也作为匹配条件:

template <typename DynamicBuffer_v1>
struct JsonReader {
    DynamicBuffer_v1 _buf;
    nlohmann::json message;

    JsonReader(DynamicBuffer_v1 buf) : _buf(buf) {}

    template <typename It>
    auto operator()(It dummy, It) {
        using namespace nlohmann;

        auto f = buffers_begin(_buf.data());
        auto l = buffers_end(_buf.data());
        bool ok = json::accept(f, l);
        if (ok) {
            auto n = [&] {
                std::istringstream iss(std::string(f, l));
                message = json::parse(iss);
                return iss.tellg(); // detect consumed
            }();

            _buf.consume(n);
            assert(n);
            std::advance(dummy, n);
            return std::pair(dummy, ok);
        } else {
            return std::pair(dummy, ok);
        }
    }
};

namespace boost::asio {
    template <typename T>
    struct is_match_condition<JsonReader<T>> : public boost::true_type { };
}

这是桃色的,在快乐的道路上工作。但是你 运行 在 edge/error 个案例上遇到了大麻烦:

  • 你无法区分不完整的数据和无效的数据,所以你必须假设未接受的输入只是不完整的(否则你永远不会等待数据完整)
  • 如果数据只是无效或
  • ,您将等待无穷大让数据变为“有效”
  • 更糟糕的是:无限期地继续阅读,可能 运行 内存不足(除非您限制缓冲区大小;这可能会导致 DoS)
  • 也许最糟糕的是,如果您读取的数据多于单个 JSON 消息(您通常无法在流套接字的上下文中阻止),则原始消息将由于“超额”而被拒绝输入”。糟糕

正在测试

确认分析预测结论的测试用例如下:

Live On Compiler Explorer

#include <boost/asio.hpp>
#include <nlohmann/json.hpp>
#include <iostream>
#include <iomanip>

template <typename Buffer>
struct JsonReader {
    static_assert(boost::asio::is_dynamic_buffer_v1<Buffer>::value);
    Buffer _buf;
    nlohmann::json message;

    JsonReader() = default;
    JsonReader(Buffer buf) : _buf(buf) {}

    template <typename It>
    auto operator()(It dummy, It) {
        using namespace nlohmann;

        auto f = buffers_begin(_buf.data());
        auto l = buffers_end(_buf.data());
        bool ok = json::accept(f, l);
        if (ok) {
            auto n = [&] {
                std::istringstream iss(std::string(f, l));
                message = json::parse(iss);
                return iss.tellg(); // detect consumed
            }();

            _buf.consume(n);
            assert(n);
            //std::advance(dummy, n);
            return std::pair(dummy, ok);
        } else {
            return std::pair(dummy, ok);
        }
    }
};

namespace boost::asio {
    template <typename T>
    struct is_match_condition<JsonReader<T>> : public boost::true_type { };
}

static inline void run_tests() {
    std::vector<std::string> valid {
        R"({})",
        R"({"a":4, "b":5})",
        R"([])",
        R"([4, "b"])",
    },
    incomplete {
        R"({)",
        R"({"a":4, "b")",
        R"([)",
        R"([4, ")",
    },
    invalid {
        R"(})",
        R"("a":4 })",
        R"(])",
    },
    excess {
        R"({}{"a":4, "b":5})",
        R"([]["a", "b"])",
        R"({} bogus trailing data)",
    };

    auto run_tests = [&](auto& cases) {
        for (std::string buf : cases) {
            std::cout << "Testing " << std::left << std::setw(22) << buf;
            bool ok = JsonReader { boost::asio::dynamic_buffer(buf) }
                (buf.begin(), buf.end())
                .second;

            std::cout << " -> " << std::boolalpha << ok << std::endl;

            if (ok && !buf.empty()) {
                std::cout << " -- remaining buffer " << std::quoted(buf) << "\n";
            }
        }
    };

    std::cout << " ----- valid test cases \n";
    run_tests(valid);
    std::cout << " ----- incomplete test cases \n";
    run_tests(incomplete);
    std::cout << " ----- invalid test cases \n";
    run_tests(invalid);
    std::cout << " ----- excess input test cases \n";
    run_tests(excess);
}

template <typename SyncReadStream, typename Buffer>
static void read(SyncReadStream& s, Buffer bufarg, nlohmann::json& message) {
    using boost::asio::buffers_begin;
    using boost::asio::buffers_end;

    JsonReader reader{bufarg};;
    read_until(s, bufarg, reader);
    message = reader.message;
}

int main() {
    run_tests();
}

版画

 ----- valid test cases
Testing {}                     -> true
Testing {"a":4, "b":5}         -> true
Testing []                     -> true
Testing [4, "b"]               -> true
 ----- incomplete test cases
Testing {                      -> false
Testing {"a":4, "b"            -> false
Testing [                      -> false
Testing [4, "                  -> false
 ----- invalid test cases
Testing }                      -> false
Testing "a":4 }                -> false
Testing ]                      -> false
 ----- excess input test cases
Testing {}{"a":4, "b":5}       -> false
Testing []["a", "b"]           -> false
Testing {} bogus trailing data -> false

寻找替代品

你可以像我过去那样自己动手:

  • Parse a substring as JSON using QJsonDocument

或者我们可以看看另一个库,它允许我们检测有效 JSON 片段的边界或检测并留下尾随输入。

手卷法

这里是 that linked answer 更现代的 Spirit X3 的简单翻译:

// Note: first iterator gets updated
// throws on known invalid input (like starting with `]' or '%')
template <typename It>
bool tryParseAsJson(It& f, It l)
{
    try {
        return detail::x3::parse(f, l, detail::json);
    } catch (detail::x3::expectation_failure<It> const& ef) {
        throw std::runtime_error("invalid JSON data");
    }
}

关键是这个*除了returntrue/false会更新起始迭代器 根据它消耗输入的程度。

namespace JsonDetect {
    namespace detail {
        namespace x3 = boost::spirit::x3;
        static const x3::rule<struct value_> value{"value"};

        static auto primitive_token
            = x3::lexeme[ x3::lit("false") | "null" | "true" ];

        static auto expect_value
            = x3::rule<struct expect_value_> { "expect_value" }
            // array, object, string, number or other primitive_token
            = x3::expect[&(x3::char_("[{\"0-9.+-") | primitive_token | x3::eoi)]
            >> value
            ;

        // 2.4.  Numbers
        // Note our spirit grammar takes a shortcut, as the RFC specification is more restrictive:
        //
        // However non of the above affect any structure characters (:,{}[] and double quotes) so it doesn't
        // matter for the current purpose. For full compliance, this remains TODO:
        //
        //    Numeric values that cannot be represented as sequences of digits
        //    (such as Infinity and NaN) are not permitted.
        //     number = [ minus ] int [ frac ] [ exp ]
        //     decimal-point = %x2E       ; .
        //     digit1-9 = %x31-39         ; 1-9
        //     e = %x65 / %x45            ; e E
        //     exp = e [ minus / plus ] 1*DIGIT
        //     frac = decimal-point 1*DIGIT
        //     int = zero / ( digit1-9 *DIGIT )
        //     minus = %x2D               ; -
        //     plus = %x2B                ; +
        //     zero = %x30                ; 0
        static auto number = x3::double_; // shortcut :)

        // 2.5 Strings
        static const x3::uint_parser<uint32_t, 16, 4, 4> _4HEXDIG;

        static auto char_ = ~x3::char_("\"\") |
               x3::char_(R"(\)") >> (       // \ (reverse solidus)
                   x3::char_(R"(")") |      // "    quotation mark  U+0022
                   x3::char_(R"(\)") |      // \    reverse solidus U+005C
                   x3::char_(R"(/)") |      // /    solidus         U+002F
                   x3::char_(R"(b)") |      // b    backspace       U+0008
                   x3::char_(R"(f)") |      // f    form feed       U+000C
                   x3::char_(R"(n)") |      // n    line feed       U+000A
                   x3::char_(R"(r)") |      // r    carriage return U+000D
                   x3::char_(R"(t)") |      // t    tab             U+0009
                   x3::char_(R"(u)") >> _4HEXDIG )  // uXXXX                U+XXXX
               ;

        static auto string = x3::lexeme [ '"' >> *char_ >> '"' ];

        // 2.2 objects
        static auto member
            = x3::expect [ &(x3::eoi | '"') ]
            >> string
            >> x3::expect [ x3::eoi | ':' ]
            >> expect_value;

        static auto object
            = '{' >> ('}' | (member % ',') >> '}');

        // 2.3 Arrays
        static auto array
            = '[' >> (']' | (expect_value % ',') >> ']');

        // 2.1 values
        static auto value_def = primitive_token | object | array | number | string;

        BOOST_SPIRIT_DEFINE(value)

        // entry point
        static auto json = x3::skip(x3::space)[expect_value];
    }  // namespace detail
}  // namespace JsonDetect

显然您将实现放在 TU 中,但在 Compiler Explorer 上我们不能:Live On Compiler Explorer,使用调整后的 JsonReader 打印:

SeheX3Detector
==============
 ----- valid test cases 
Testing {}                     -> true
Testing {"a":4, "b":5}         -> true
Testing []                     -> true
Testing [4, "b"]               -> true
 ----- incomplete test cases 
Testing {                      -> false
Testing {"a":4, "b"            -> false
Testing [                      -> false
Testing [4, "                  -> false
 ----- invalid test cases 
Testing }                      -> invalid JSON data
Testing "a":4 }                -> true -- remaining `:4 }`
Testing ]                      -> invalid JSON data
 ----- excess input test cases 
Testing {}{"a":4, "b":5}       -> true -- remaining `{"a":4, "b":5}`
Testing []["a", "b"]           -> true -- remaining `["a", "b"]`
Testing {} bogus trailing data -> true -- remaining ` bogus trailing data`

NlohmannDetector
================
 ----- valid test cases 
Testing {}                     -> true
Testing {"a":4, "b":5}         -> true
Testing []                     -> true
Testing [4, "b"]               -> true
 ----- incomplete test cases 
Testing {                      -> false
Testing {"a":4, "b"            -> false
Testing [                      -> false
Testing [4, "                  -> false
 ----- invalid test cases 
Testing }                      -> false
Testing "a":4 }                -> false
Testing ]                      -> false
 ----- excess input test cases 
Testing {}{"a":4, "b":5}       -> false
Testing []["a", "b"]           -> false
Testing {} bogus trailing data -> false

Note how we now achieved some of the goals.

  • accepting trailing data - so we don't clobber any data after our message
  • failing early on some inputs that cannot possibly become valid JSON
  • However, we can't fix the problem of waiting indefinitely on /possibly/ incomplete valid data
  • Interestingly, one of our "invalid" test cases was wrong (!). (It is always a good sign when test cases fail). This is because "a" is actually a valid JSON value on its own.

结论

在一般情况下,如果不至少限制缓冲区大小,就不可能进行这种“完整消息”检测。例如。一个有效的输入可以从一百万个空格开始。你不想等待那个。

另外,一个有效的输入可以打开一个字符串、对象或数组²,并且不会在几千兆字节内终止它。如果您事先停止解析,您将永远不知道它最终是否是一条有效消息。

尽管无论如何您都不可避免地要处理网络超时,但您更愿意主动了解会发生什么。例如。提前发送有效负载的大小,以便您可以使用 boost::asio::transfer_exactly 并准确验证您期望得到的内容。


¹ 实际上。如果您不关心性能,可以迭代 运行 accept 增加缓冲区的长度

² 上帝保佑,像 0000....00001 这样的数字虽然那是 subject to parser implementation differences

当然,在发布我的 other answer 之后,我记得 Boost 在 1.75.0 中接受了 Boost JSON。

它更优雅地进行流解析:https://www.boost.org/doc/libs/1_75_0/libs/json/doc/html/json/ref/boost__json__stream_parser.html#json.ref.boost__json__stream_parser.usage

它实际上也处理尾随数据!

stream_parser p;                  // construct a parser
std::size_t n;                    // number of characters used
n = p.write_some( "[1,2" );       // parse some of a JSON
assert( n == 4 );                 // all characters consumed
n = p.write_some( ",3,4] null" ); // parse the remainder of the JSON
assert( n == 6 );                 // only some characters consumed
assert( p.done() );               // we have a complete JSON
value jv = p.release();           // take ownership of the value

我还认为这可能更适合 CompletionCondition: see https://www.boost.org/doc/libs/1_75_0/doc/html/boost_asio/reference/read/overload3.html

这是我测试过的实现:

template <typename Buffer, typename SyncReadStream>
static size_t read_json(SyncReadStream& s, Buffer buf,
    boost::json::value& message, boost::json::parse_options options = {})
{
    boost::json::stream_parser p{{}, options};

    size_t total_parsed = 0;
    boost::asio::read(s, buf, [&](boost::system::error_code ec, size_t /*n*/) {
        size_t parsed = 0;

        for (auto& contiguous : buf.data()) {
            parsed += p.write_some(
                boost::asio::buffer_cast<char const*>(contiguous),
                contiguous.size(), ec);
        }
        buf.consume(parsed);
        total_parsed += parsed;
        return ec || p.done(); // true means done
    });

    message = p.release(); // throws if incomplete
    return total_parsed;
}

为流缓冲区添加委托重载:

template <typename SyncReadStream, typename Alloc>
static size_t read_json(SyncReadStream& s,
    boost::asio::basic_streambuf<Alloc>& buf,
    boost::json::value& message,
    boost::json::parse_options options = {})
{
    return read_json(s, boost::asio::basic_streambuf_ref<Alloc>(buf), message, options);
}

演示程序

此演示程序添加了 test-cases from earlier 以及添加了一些基准统计信息的套接字客户端。参数:

  • test 到 运行 测试而不是套接字客户端
  • streambuf 使用 streambuf 重载而不是 std::string 动态缓冲区
  • comments 允许在 JSON
  • 中发表评论
  • trailing_commas 允许在 JSON
  • 中使用尾随逗号
  • invalid_utf8 允许 JSON
  • 中的无效 utf8

Live On Compiler Explorer¹

#include <boost/spirit/home/x3.hpp>
#include <boost/fusion/adapted.hpp>
#include <iomanip>
#include <iostream>
namespace x3 = boost::spirit::x3;

int main() {
    std::string const s = 
        "? 8==2 : true ! false"
        "? 9==3 : 'book' ! 'library'";

    using expression = std::string;
    using ternary = std::tuple<expression, expression, expression>;
    std::vector<ternary> parsed;

    auto expr_ = x3::lexeme [+~x3::char_("?:!")];
    auto ternary_ = "?" >> expr_ >> ":" >> expr_ >> "!" >> expr_;

    std::cout << "=== parser approach:\n";
    if (x3::phrase_parse(begin(s), end(s), *x3::seek[ ternary_ ], x3::space, parsed)) {

        for (auto [cond, e1, e2] : parsed) {
            std::cout
                << " condition " << std::quoted(cond) << "\n"
                << " true expression " << std::quoted(e1) << "\n"
                << " else expression " << std::quoted(e2) << "\n"
                << "\n";
        }
    } else {
        std::cout << "non matching" << '\n';
    }
}

test 打印:

 ----- valid test cases
Testing {}                     -> Success {}
Testing {"a":4, "b":5}         -> Success {"a":4,"b":5}
Testing []                     -> Success []
Testing [4, "b"]               -> Success [4,"b"]
 ----- incomplete test cases
Testing {                      -> (incomplete...)
Testing {"a":4, "b"            -> (incomplete...)
Testing [                      -> (incomplete...)
Testing [4, "                  -> (incomplete...)
 ----- invalid test cases
Testing }                      -> syntax error
Testing "a":4 }                -> Success "a" -- remaining `:4 }`
Testing ]                      -> syntax error
 ----- excess input test cases
Testing {}{"a":4, "b":5}       -> Success {} -- remaining `{"a":4, "b":5}`
Testing []["a", "b"]           -> Success [] -- remaining `["a", "b"]`
Testing {} bogus trailing data -> Success {} -- remaining `bogus trailing data`

使用套接字客户端的一些演示:

Mean packet size: 16 in 2 packets
Request: 28 bytes
Request: {"a":4,"b":"5"} bytes
Remaining data: "bye
"
took 0.000124839s, ~0.213899MiB/s

大 (448MiB) location_history.json:

Mean packet size: 511.999 in 917791 packets
Request: 469908167 bytes
 (large request output suppressed)
took 3.30509s, ~135.59MiB/s


¹ 仅链接 non-header 编译器资源管理器不支持库