如何为 actix websocket 服务器配置负载限制
How to config payload limit for a actix websocket server
我正在学习 Actix 并尝试创建 WebSocket 服务
代码片段:
启动服务器
pub async fn start(addr: &str) -> std::result::Result<(), IoError> {
let connections = Connections::default().start();
HttpServer::new(move || {
App::new().service(
web::resource("/ws/")
.data(connections.clone())
.route(web::get().to(ws_index)),
)
})
.bind(addr)?
.run()
.await
}
处理程序
async fn ws_index(
req: HttpRequest,
stream: web::Payload,
addr: web::Data<Addr<Connections>>,
) -> Result<HttpResponse, Error> {
let id = Uuid::new_v4().to_simple().to_string();
let client = Connection::new(id, addr.get_ref().clone());
let resp = ws::start(client, &req, stream);
resp
}
流处理器
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Connection {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
match msg {
Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
Ok(ws::Message::Pong(_)) => self.hb = Instant::now(),
Ok(ws::Message::Text(text)) => ctx.text(text),
Ok(ws::Message::Close(reason)) => {
ctx.stop();
println!("Connection {} closed with reason {:?}", self.id, reason);
}
Err(e) => println!("Error: {}", e),
_ => (),
}
}
}
现在 WebSocket 服务器 运行,它可以接收文本并将其发回。但是如果我发送大文本,服务器会记录“错误:有效载荷达到大小限制”。如何解决?
您必须使用特定的编解码器创建 WebSocket,而不是使用 ws::start()
,因为这是您可以设置有效负载大小的地方 https://docs.rs/actix-http/2.2.0/actix_http/ws/struct.Codec.html。
所以创建你的启动函数:
use actix_web::error::PayloadError;
use ws::{handshake, WebsocketContext};
use actix_http::ws::{Codec, Message, ProtocolError};
use bytes::Bytes;
fn start_with_codec<A, S>(actor: A, req: &HttpRequest, stream: S, codec: Codec) -> Result<HttpResponse, Error>
where
A: Actor<Context = WebsocketContext<A>>
+ StreamHandler<Result<Message, ProtocolError>>,
S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
{
let mut res = handshake(req)?;
Ok(res.streaming(WebsocketContext::with_codec(actor, stream, codec)))
}
然后您更改代码以调用新函数并传递编解码器:
async fn ws_index(
req: HttpRequest,
stream: web::Payload,
addr: web::Data<Addr<Connections>>,
) -> Result<HttpResponse, Error> {
let id = Uuid::new_v4().to_simple().to_string();
let client = Connection::new(id, addr.get_ref().clone());
let resp = start_with_codec(client, &req, stream, Codec::new().max_size(MAX_SIZE));
resp
}
已更新
在大负载上,消息可以通过 Message::Continuation
变体分解成几个较小的片段。
此变体包含具有以下变体的项目枚举:
pub enum Item {
FirstText(Bytes),
FirstBinary(Bytes),
Continue(Bytes),
Last(Bytes),
}
要重组原始消息,您必须从 FirstText / FirstBinary 收集所有片段,直到最后一个,或者确保以正确的顺序将所有这些消息转发到将检索消息的目的地。
从 actix web v4 开始,使用 WsResponseBuilder.
可以轻松配置 websocket
您现在可以:
async fn ws_index(
req: HttpRequest,
stream: web::Payload,
addr: web::Data<Addr<Connections>>,
) -> Result<HttpResponse, Error> {
let id = Uuid::new_v4().to_simple().to_string();
let client = Connection::new(id, addr.get_ref().clone());
ws::WsResponseBuilder::new(client, &req, stream)
.frame_size(MAX_SIZE)
.start()
}
我正在学习 Actix 并尝试创建 WebSocket 服务
代码片段:
启动服务器
pub async fn start(addr: &str) -> std::result::Result<(), IoError> {
let connections = Connections::default().start();
HttpServer::new(move || {
App::new().service(
web::resource("/ws/")
.data(connections.clone())
.route(web::get().to(ws_index)),
)
})
.bind(addr)?
.run()
.await
}
处理程序
async fn ws_index(
req: HttpRequest,
stream: web::Payload,
addr: web::Data<Addr<Connections>>,
) -> Result<HttpResponse, Error> {
let id = Uuid::new_v4().to_simple().to_string();
let client = Connection::new(id, addr.get_ref().clone());
let resp = ws::start(client, &req, stream);
resp
}
流处理器
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Connection {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
match msg {
Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
Ok(ws::Message::Pong(_)) => self.hb = Instant::now(),
Ok(ws::Message::Text(text)) => ctx.text(text),
Ok(ws::Message::Close(reason)) => {
ctx.stop();
println!("Connection {} closed with reason {:?}", self.id, reason);
}
Err(e) => println!("Error: {}", e),
_ => (),
}
}
}
现在 WebSocket 服务器 运行,它可以接收文本并将其发回。但是如果我发送大文本,服务器会记录“错误:有效载荷达到大小限制”。如何解决?
您必须使用特定的编解码器创建 WebSocket,而不是使用 ws::start()
,因为这是您可以设置有效负载大小的地方 https://docs.rs/actix-http/2.2.0/actix_http/ws/struct.Codec.html。
所以创建你的启动函数:
use actix_web::error::PayloadError;
use ws::{handshake, WebsocketContext};
use actix_http::ws::{Codec, Message, ProtocolError};
use bytes::Bytes;
fn start_with_codec<A, S>(actor: A, req: &HttpRequest, stream: S, codec: Codec) -> Result<HttpResponse, Error>
where
A: Actor<Context = WebsocketContext<A>>
+ StreamHandler<Result<Message, ProtocolError>>,
S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
{
let mut res = handshake(req)?;
Ok(res.streaming(WebsocketContext::with_codec(actor, stream, codec)))
}
然后您更改代码以调用新函数并传递编解码器:
async fn ws_index(
req: HttpRequest,
stream: web::Payload,
addr: web::Data<Addr<Connections>>,
) -> Result<HttpResponse, Error> {
let id = Uuid::new_v4().to_simple().to_string();
let client = Connection::new(id, addr.get_ref().clone());
let resp = start_with_codec(client, &req, stream, Codec::new().max_size(MAX_SIZE));
resp
}
已更新
在大负载上,消息可以通过 Message::Continuation
变体分解成几个较小的片段。
此变体包含具有以下变体的项目枚举:
pub enum Item {
FirstText(Bytes),
FirstBinary(Bytes),
Continue(Bytes),
Last(Bytes),
}
要重组原始消息,您必须从 FirstText / FirstBinary 收集所有片段,直到最后一个,或者确保以正确的顺序将所有这些消息转发到将检索消息的目的地。
从 actix web v4 开始,使用 WsResponseBuilder.
可以轻松配置 websocket您现在可以:
async fn ws_index(
req: HttpRequest,
stream: web::Payload,
addr: web::Data<Addr<Connections>>,
) -> Result<HttpResponse, Error> {
let id = Uuid::new_v4().to_simple().to_string();
let client = Connection::new(id, addr.get_ref().clone());
ws::WsResponseBuilder::new(client, &req, stream)
.frame_size(MAX_SIZE)
.start()
}