nghttp2:使用服务器发送的事件供 EventSource 使用
nghttp2: Using server-sent events to be use by EventSource
我正在使用 nghttp2 来实现一个 REST 服务器,它应该使用 HTTP/2 和服务器发送的事件(由浏览器中的 EventSource 使用)。但是,根据示例,我不清楚如何实施 SSE。在 asio-sv.cc
中使用 res.push() 似乎不是正确的方法。
正确的做法是什么?我更喜欢使用 nghttp2 的 C++ API,但 C API 也可以。
是的,我在 2018 年做过类似的事情。文档相当稀疏:)。
首先,请忽略 response::push
,因为那是 HTTP2 推送——用于在客户端请求它们之前主动向客户端发送未经请求的对象。我知道这听起来像您所需要的,但事实并非如此——典型的用例是主动发送 CSS 文件和一些图像以及最初请求的 HTML 页面。
关键是当您 运行 没有数据要发送时,您的 end()
回调最终必须 return NGHTTP2_ERR_DEFERRED
。当您的应用程序以某种方式获得更多要发送的数据时,请调用 http::response::resume()
.
这是一个简单的代码。将其构建为 g++ -std=c++17 -Wall -O3 -ggdb clock.cpp -lssl -lcrypto -pthread -lnghttp2_asio -lspdlog -lfmt
。请注意,现代浏览器不会在明文套接字上执行 HTTP/2,因此您需要通过 nghttpx -f '*,8080;no-tls' -b '::1,10080;;proto=h2'
.
之类的东西对其进行反向代理
#include <boost/asio/io_service.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/signals2.hpp>
#include <chrono>
#include <list>
#include <nghttp2/asio_http2_server.h>
#define SPDLOG_FMT_EXTERNAL
#include <spdlog/spdlog.h>
#include <thread>
using namespace nghttp2::asio_http2;
using namespace std::literals;
using Signal = boost::signals2::signal<void(const std::string& message)>;
class Client {
const server::response& res;
enum State {
HasEvents,
WaitingForEvents,
};
std::atomic<State> state;
std::list<std::string> queue;
mutable std::mutex mtx;
boost::signals2::scoped_connection subscription;
size_t send_chunk(uint8_t* destination, std::size_t len, uint32_t* data_flags [[maybe_unused]])
{
std::size_t written{0};
std::lock_guard lock{mtx};
if (state != HasEvents) throw std::logic_error{std::to_string(__LINE__)};
while (!queue.empty()) {
auto num = std::min(queue.front().size(), len - written);
std::copy_n(queue.front().begin(), num, destination + written);
written += num;
if (num < queue.front().size()) {
queue.front() = queue.front().substr(num);
spdlog::debug("{} send_chunk: partial write", (void*)this);
return written;
}
queue.pop_front();
spdlog::debug("{} send_chunk: sent one event", (void*)this);
}
state = WaitingForEvents;
return written;
}
public:
Client(const server::request& req, const server::response& res, Signal& signal)
: res{res}
, state{WaitingForEvents}
, subscription{signal.connect([this](const auto& msg) {
enqueue(msg);
})}
{
spdlog::warn("{}: {} {} {}", (void*)this, boost::lexical_cast<std::string>(req.remote_endpoint()), req.method(), req.uri().raw_path);
res.write_head(200, {{"content-type", {"text/event-stream", false}}});
}
void onClose(const uint32_t ec)
{
spdlog::error("{} onClose", (void*)this);
subscription.disconnect();
}
ssize_t process(uint8_t* destination, std::size_t len, uint32_t* data_flags)
{
spdlog::trace("{} process", (void*)this);
switch (state) {
case HasEvents:
return send_chunk(destination, len, data_flags);
case WaitingForEvents:
return NGHTTP2_ERR_DEFERRED;
}
__builtin_unreachable();
}
void enqueue(const std::string& what)
{
{
std::lock_guard lock{mtx};
queue.push_back("data: " + what + "\n\n");
}
state = HasEvents;
res.resume();
}
};
int main(int argc [[maybe_unused]], char** argv [[maybe_unused]])
{
spdlog::set_level(spdlog::level::trace);
Signal sig;
std::thread timer{[&sig]() {
for (int i = 0; /* forever */; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds{666});
spdlog::info("tick: {}", i);
sig("ping #" + std::to_string(i));
}
}};
server::http2 server;
server.num_threads(4);
server.handle("/events", [&sig](const server::request& req, const server::response& res) {
auto client = std::make_shared<Client>(req, res, sig);
res.on_close([client](const auto ec) {
client->onClose(ec);
});
res.end([client](uint8_t* destination, std::size_t len, uint32_t* data_flags) {
return client->process(destination, len, data_flags);
});
});
server.handle("/", [](const auto& req, const auto& resp) {
spdlog::warn("{} {} {}", boost::lexical_cast<std::string>(req.remote_endpoint()), req.method(), req.uri().raw_path);
resp.write_head(200, {{"content-type", {"text/html", false}}});
resp.end(R"(<html><head><title>nghttp2 event stream</title></head>
<body><h1>events</h1><ul id="x"></ul>
<script type="text/javascript">
const ev = new EventSource("/events");
ev.onmessage = function(event) {
const li = document.createElement("li");
li.textContent = event.data;
document.getElementById("x").appendChild(li);
};
</script>
</body>
</html>)");
});
boost::system::error_code ec;
if (server.listen_and_serve(ec, "::", "10080")) {
return 1;
}
return 0;
}
我感觉我的队列处理可能太复杂了。通过 curl
进行测试时,我似乎从未 运行 超出缓冲区 space。换句话说,即使客户端没有从套接字读取任何数据,库也会继续调用 send_chunk
,一次最多为我请求 16kB 的数据。奇怪的。我不知道在更大量地推送更多数据时它是如何工作的。
我的“真实代码”曾经有第三种状态,Closed
,但我认为通过 on_close
阻止事件在这里就足够了。但是,我认为如果客户端已经断开连接,但在调用析构函数之前,您永远不想输入 send_chunk
。
我正在使用 nghttp2 来实现一个 REST 服务器,它应该使用 HTTP/2 和服务器发送的事件(由浏览器中的 EventSource 使用)。但是,根据示例,我不清楚如何实施 SSE。在 asio-sv.cc
中使用 res.push() 似乎不是正确的方法。
正确的做法是什么?我更喜欢使用 nghttp2 的 C++ API,但 C API 也可以。
是的,我在 2018 年做过类似的事情。文档相当稀疏:)。
首先,请忽略 response::push
,因为那是 HTTP2 推送——用于在客户端请求它们之前主动向客户端发送未经请求的对象。我知道这听起来像您所需要的,但事实并非如此——典型的用例是主动发送 CSS 文件和一些图像以及最初请求的 HTML 页面。
关键是当您 运行 没有数据要发送时,您的 end()
回调最终必须 return NGHTTP2_ERR_DEFERRED
。当您的应用程序以某种方式获得更多要发送的数据时,请调用 http::response::resume()
.
这是一个简单的代码。将其构建为 g++ -std=c++17 -Wall -O3 -ggdb clock.cpp -lssl -lcrypto -pthread -lnghttp2_asio -lspdlog -lfmt
。请注意,现代浏览器不会在明文套接字上执行 HTTP/2,因此您需要通过 nghttpx -f '*,8080;no-tls' -b '::1,10080;;proto=h2'
.
#include <boost/asio/io_service.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/signals2.hpp>
#include <chrono>
#include <list>
#include <nghttp2/asio_http2_server.h>
#define SPDLOG_FMT_EXTERNAL
#include <spdlog/spdlog.h>
#include <thread>
using namespace nghttp2::asio_http2;
using namespace std::literals;
using Signal = boost::signals2::signal<void(const std::string& message)>;
class Client {
const server::response& res;
enum State {
HasEvents,
WaitingForEvents,
};
std::atomic<State> state;
std::list<std::string> queue;
mutable std::mutex mtx;
boost::signals2::scoped_connection subscription;
size_t send_chunk(uint8_t* destination, std::size_t len, uint32_t* data_flags [[maybe_unused]])
{
std::size_t written{0};
std::lock_guard lock{mtx};
if (state != HasEvents) throw std::logic_error{std::to_string(__LINE__)};
while (!queue.empty()) {
auto num = std::min(queue.front().size(), len - written);
std::copy_n(queue.front().begin(), num, destination + written);
written += num;
if (num < queue.front().size()) {
queue.front() = queue.front().substr(num);
spdlog::debug("{} send_chunk: partial write", (void*)this);
return written;
}
queue.pop_front();
spdlog::debug("{} send_chunk: sent one event", (void*)this);
}
state = WaitingForEvents;
return written;
}
public:
Client(const server::request& req, const server::response& res, Signal& signal)
: res{res}
, state{WaitingForEvents}
, subscription{signal.connect([this](const auto& msg) {
enqueue(msg);
})}
{
spdlog::warn("{}: {} {} {}", (void*)this, boost::lexical_cast<std::string>(req.remote_endpoint()), req.method(), req.uri().raw_path);
res.write_head(200, {{"content-type", {"text/event-stream", false}}});
}
void onClose(const uint32_t ec)
{
spdlog::error("{} onClose", (void*)this);
subscription.disconnect();
}
ssize_t process(uint8_t* destination, std::size_t len, uint32_t* data_flags)
{
spdlog::trace("{} process", (void*)this);
switch (state) {
case HasEvents:
return send_chunk(destination, len, data_flags);
case WaitingForEvents:
return NGHTTP2_ERR_DEFERRED;
}
__builtin_unreachable();
}
void enqueue(const std::string& what)
{
{
std::lock_guard lock{mtx};
queue.push_back("data: " + what + "\n\n");
}
state = HasEvents;
res.resume();
}
};
int main(int argc [[maybe_unused]], char** argv [[maybe_unused]])
{
spdlog::set_level(spdlog::level::trace);
Signal sig;
std::thread timer{[&sig]() {
for (int i = 0; /* forever */; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds{666});
spdlog::info("tick: {}", i);
sig("ping #" + std::to_string(i));
}
}};
server::http2 server;
server.num_threads(4);
server.handle("/events", [&sig](const server::request& req, const server::response& res) {
auto client = std::make_shared<Client>(req, res, sig);
res.on_close([client](const auto ec) {
client->onClose(ec);
});
res.end([client](uint8_t* destination, std::size_t len, uint32_t* data_flags) {
return client->process(destination, len, data_flags);
});
});
server.handle("/", [](const auto& req, const auto& resp) {
spdlog::warn("{} {} {}", boost::lexical_cast<std::string>(req.remote_endpoint()), req.method(), req.uri().raw_path);
resp.write_head(200, {{"content-type", {"text/html", false}}});
resp.end(R"(<html><head><title>nghttp2 event stream</title></head>
<body><h1>events</h1><ul id="x"></ul>
<script type="text/javascript">
const ev = new EventSource("/events");
ev.onmessage = function(event) {
const li = document.createElement("li");
li.textContent = event.data;
document.getElementById("x").appendChild(li);
};
</script>
</body>
</html>)");
});
boost::system::error_code ec;
if (server.listen_and_serve(ec, "::", "10080")) {
return 1;
}
return 0;
}
我感觉我的队列处理可能太复杂了。通过 curl
进行测试时,我似乎从未 运行 超出缓冲区 space。换句话说,即使客户端没有从套接字读取任何数据,库也会继续调用 send_chunk
,一次最多为我请求 16kB 的数据。奇怪的。我不知道在更大量地推送更多数据时它是如何工作的。
我的“真实代码”曾经有第三种状态,Closed
,但我认为通过 on_close
阻止事件在这里就足够了。但是,我认为如果客户端已经断开连接,但在调用析构函数之前,您永远不想输入 send_chunk
。