如何使用 Proxygen 和 Folly 发送 HTTP 分块响应来模拟视频流?

How to send HTTP chunked response to emulate a video stream using Proxygen and Folly?

我正在编写一个基于 Facebook 的 Proxygen 的 HTTP 视频流服务器。没有寻求计划。使用 proxygen::ResponseBuilder 我能够发送 webm 编码视频的块作为 HTTP 响应,即分块传输编码正在工作。我的问题是,Proxygen 在发送响应 headers 之前等待 proxygen::ResponseBuilder::sendWithEOM()。我希望它在每次调用 proxygen::ResponseBuilder::send() 后尽快发送数据。

我尝试 运行 使用 evb->runInLoop()evb->runInEventBaseThread()

从 EventBaseThread 执行的 lambda 调用 ResponseBuilder
using namespace folly;
using namespace proxygen;

std::thread t([&](){
    EventBase* evb = EventBaseManager::get()->getExistingEventBase();    
    // send headers ...
    while ( chunks avail. ) {
        //...
        evb->runInLoop([&](){
            ResponseBuilder(downstream_)
                     .body(std::move(chunk))
                     .send();
        });
        //... 
    }
    // sendWithEOM ...
});
t.detach();

此代码是从我的 RequestHandleronRequest() 方法中调用的。我试图调用 ResponseBuilder::send() 而不将其包装到 evb->runInLoop(),但是带有 Folly v0.42.0 的 Proxygen v0.25.0 禁止使用断言从另一个线程调用 ResponseBuilder::send()。我从这里删除了这个断言:https://github.com/facebook/folly/blob/v0.42.0/folly/io/async/EventBase.cpp#L491.

现在模拟流可以正常工作,但如果有并行请求,它就会崩溃。我想它不应该像这样使用,这就是断言的目的。但也许有人知道如何为我的用例正确使用 Proxygen 基础设施?

using namespace folly;
using namespace proxygen;

//Get Event Base in worker thread and pass it to new thread. If you call this inside new thread then you won't get worker thread's event base.
EventBase* evb = EventBaseManager::get()->getExistingEventBase();   
std::thread t([&, evb](){

// send headers ...
while ( chunks avail. ) {
    //...
    evb->runInLoop([&](){
        ResponseBuilder(downstream_)
                 .body(std::move(chunk))
                 .send();
    });
    //... 
}
// sendWithEOM ...
});
t.detach();

因此无需在 EventBase 源中注释断言。

这也有同样的问题。我用这样的东西让它工作。

folly::EventBase* eventBase = folly::EventBaseManager::get()->getExistingEventBase();
thread t([&, eventBase]() {
    while( chunks exist ) {
        auto chunk = getChunk();    
        eventBase->runInEventBaseThread([&, chunk=chunk]() mutable {
            ResponseBuilder(downstream_).body(move(chunk)).send();
        });
    }
});
// sendWithEOM ...
t.detach();