Rust Hyper/axum 具有计算密集型和高吞吐量的网络服务器

Rust Hyper/axum webserver with computation intensive and high throughput

我有一个例子,我有一个带有 axum 的 http 服务器,它以非常高的吞吐量(可以达到每秒 20m)接收负载。 我需要从请求中获取这些字节并对它们进行一些繁重的计算。 内存达到不可预测的高(可能达到 5Gb)的问题。 这是关于我如何实现它的当前设置:

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let (tx, mut rx) = mpsc::channel::<WriteRequest>(32);

    tokio::spawn(async move {
      while let Some(payload) = rx.recv().await {
        tokio::task::spawn_blocking(move || {
          // Run heavy computation here...
          heavy_computation(payload)
        }).await;
      }
    });

    // build our application with a route
    let app = Router::new()
        .route("/write", post(move |req: Bytes| async move {
           let data: WriteRequest = Message::decode(req);
           // send data here
           let _ = tx.send(data).await;

           "ok"
         }));

    let addr = ([0, 0, 0, 0], 8080).into();
    let server = axum::Server::bind(&addr)
        .serve(app.into_make_service());
    
    if let Err(e) = server.await {
        error!("server error: {}", e);
    }
    
    Ok(())
}

我认为是 bounded channel 上的背压使请求不断堆积,直到它们可以发送到另一个 task 进行处理,从而导致高内存。 因为即使我尝试用一​​个简单的 sleep 替换 heavy_copmutation 大约 200ms,它也会以相同的结果结束。 如果我删除 heavy_computation 部分,内存会保持低水平。

解决此类问题的正确方法是什么?或者对于这种高吞吐量,这里什么也做不了?

非常感谢!

感觉就像 heavy_computation 很忙的时候,数以百万计的待处理请求堆积如山。需要限制一次处理的接受数 connections/requests。要达到 5Gb 的使用量,只需要 25K 的挂起请求和 200Kb 的有效负载,甚至不到数百万。

axum 基于 tower, and tower is based on hyper.

hyper doesn't have a max connections setting 是一个已知问题,但那里的人建议使用来自塔的 ConcurrencyLimit 中间件,并将其配置到服务器中,或者进行自定义 accept/handle 循环。

也许也可以通过 axum 将此中间件传递到塔中,但如果它是您的一个选项,您可以尝试直接转到塔或什至裸 hyper,并使用那里可用的原语实现它这个工作量。