如何使用 actix-web 在 websocket 处理程序中启动守护进程?
How can I launch a daemon in a websocket handler with actix-web?
鉴于 WebSocket server with Actix 的基本设置,我如何在我的消息处理程序中启动守护进程?
我已经扩展了上面链接的示例起始代码,以使用 fork crate 调用 daemon(false, true)
。
use actix::{Actor, StreamHandler};
use actix_web::{web, App, Error, HttpRequest, HttpResponse, HttpServer};
use actix_web_actors::ws;
use fork::{daemon, Fork};
/// Define HTTP actor
struct MyWs;
impl Actor for MyWs {
type Context = ws::WebsocketContext<Self>;
}
/// Handler for ws::Message message
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWs {
fn handle(
&mut self,
msg: Result<ws::Message, ws::ProtocolError>,
ctx: &mut Self::Context,
) {
match msg {
Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
Ok(ws::Message::Text(text)) => {
println!("text message received");
if let Ok(Fork::Child) = daemon(false, true) {
println!("from daemon: this print but then the websocket crashes!");
};
ctx.text(text)
},
Ok(ws::Message::Binary(bin)) => ctx.binary(bin),
_ => (),
}
}
}
async fn index(req: HttpRequest, stream: web::Payload) -> Result<HttpResponse, Error> {
let resp = ws::start(MyWs {}, &req, stream);
println!("{:?}", resp);
resp
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
HttpServer::new(|| App::new().route("/ws/", web::get().to(index)))
.bind("127.0.0.1:8080")?
.run()
.await
}
上面的代码启动了服务器,但是当我向它发送消息时,我收到了 Panic in Arbiter thread
。
text message received
from daemon: this print but then the websocket crashes!
thread 'actix-rt:worker:0' panicked at 'failed to park', /Users/xxx/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.25/src/runtime/basic_scheduler.rs:158:56
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Panic in Arbiter thread.
您的应用程序的问题是 actix-web 运行 时间(即 Tokio)是 multi-threaded。这是一个问题,因为 fork()
调用(由 daemon()
内部使用)仅复制调用 fork()
.
的线程
即使您的 parent 进程有 N 个线程,您的 child 进程也只有 1 个。如果您的 parent 进程有任何被这些线程锁定的互斥锁,它们的状态将是在 child 进程中复制,但由于这些线程不存在,它们将永远保持锁定状态。
如果你有一个 Rc
/Arc
它永远不会 de-allocate 它的内存,因为它永远不会被删除,因此它的内部计数永远不会达到零。这同样适用于任何指针和共享状态。
或者更简单地说 - 你的分叉 child 将以未定义状态结束。
这在 Calling fork() in a Multithreaded Environment 中得到了最好的解释:
The fork( ) system call creates an exact duplicate of the address
space from which it is called, resulting in two address spaces
executing the same code. Problems can occur if the forking address
space has multiple threads executing at the time of the fork( ). When
multithreading is a result of library invocation, threads are not
necessarily aware of each other's presence, purpose, actions, and so
on. Suppose that one of the other threads (any thread other than the
one doing the fork( )) has the job of deducting money from your
checking account. Clearly, you do not want this to happen twice as a
result of some other thread's decision to call fork( ).
Because of these types of problems, which in general are problems of
threads modifying persistent state, POSIX defined the behavior of
fork( ) in the presence of threads to propagate only the forking
thread. This solves the problem of improper changes being made to
persistent state. However, it causes other problems, as discussed in
the next paragraph.
In the POSIX model, only the forking thread is propagated. All the
other threads are eliminated without any form of notice; no cancels
are sent and no handlers are run. However, all the other portions of
the address space are cloned, including all the mutex state. If the
other thread has a mutex locked, the mutex will be locked in the child
process, but the lock owner will not exist to unlock it. Therefore,
the resource protected by the lock will be permanently unavailable.
在这里您可以找到 a more reputable source 以及更多详细信息
回答你的另一个问题:
"how can I launch a daemon inside my message handler?"
我假设您想实现经典的 unix "fork() on accept()" 模型。
在那种情况下,你就不走运了,因为 actix-web 和 async/await 等服务器
一般来说,在设计时并没有考虑到这一点。即使你有一个
single-threaded async/await 服务器,然后:
当 child 分叉时,它会继承 parent 的所有文件描述符。所以就是
common 在 fork 之后,child 关闭它的监听套接字以避免
资源泄漏 - 但在任何基于 async/await 的服务器上都无法做到这一点,
不是因为做不到,而是因为没有实现。
更重要的原因是为了防止 child 过程
从接受新连接 - 因为即使你 运行 一个单线程
服务器,它仍然能够同时处理许多任务——即
当你的处理程序调用 .await
时,接受者可以自由地
接受一个新连接(通过从套接字队列中窃取它)并开始处理它。
您的 parent 服务器可能已经生成了很多任务,这些任务将是
在每个分叉 child 中复制,因此多次执行相同的事情,
在每个过程中独立
好吧......没有办法阻止任何 async/await
基于我熟悉的服务器。您需要一个自定义服务器:
- 如果它是 child 并且检测到它是 child,则检查它的接受器任务
它应该关闭侦听套接字并删除接受器。
- 它不应执行从 parent 派生的任何其他任务,
但没有办法实现。
换句话说 - async/await 和“fork() on accept()”是两个不同的
不兼容 个并发处理任务的模型。
一个可能的解决方案是有一个 non-async 接受器守护进程,它只
接受连接并分叉自身。然后在 child 中生成一个 web-server
然后将其送入接受的插座。但是尽管可能,none 的服务器
目前对此有支持。
如另一个答案中所述,如果您在子进程中触摸它,您所依赖的异步运行时可能会完全中断。触摸任何东西都可以完全打破 actix 或 tokio 开发者所做的假设。如果你从这个函数中 return 就会发生古怪的事情。
请参阅 this response by one of the key authors of tokio 某人做类似的事情(在 hyper 的线程池上下文中调用 fork()
):
Threads + fork is bad news... you can fork if you immediately exec and do not allocate memory or perform any other operation that may have been corrupted by the fork.
回到你的问题:
The objective is for my websocket to respond to messages and be able to launch isolated long-running processes that launch successfully and do not exit when the websocket exits.
我认为您根本不想手动 fork()
。 actix/tokio 提供的实用函数应该与其运行时很好地集成。您可以:
- 运行 阻塞或 CPU-专用线程中的大量代码
actix_web::block
- 使用
actix::AsyncContext::spawn
. You would ideally want to use e.g. tokio::process::Command
而不是 std
版本生成未来以避免在异步上下文中阻塞。
- 如果您在子进程中所做的只是 运行
Command::new()
和后来的 Command::spawn()
,我很确定您可以直接调用它。无需分叉; it does that internally.
鉴于 WebSocket server with Actix 的基本设置,我如何在我的消息处理程序中启动守护进程?
我已经扩展了上面链接的示例起始代码,以使用 fork crate 调用 daemon(false, true)
。
use actix::{Actor, StreamHandler};
use actix_web::{web, App, Error, HttpRequest, HttpResponse, HttpServer};
use actix_web_actors::ws;
use fork::{daemon, Fork};
/// Define HTTP actor
struct MyWs;
impl Actor for MyWs {
type Context = ws::WebsocketContext<Self>;
}
/// Handler for ws::Message message
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWs {
fn handle(
&mut self,
msg: Result<ws::Message, ws::ProtocolError>,
ctx: &mut Self::Context,
) {
match msg {
Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
Ok(ws::Message::Text(text)) => {
println!("text message received");
if let Ok(Fork::Child) = daemon(false, true) {
println!("from daemon: this print but then the websocket crashes!");
};
ctx.text(text)
},
Ok(ws::Message::Binary(bin)) => ctx.binary(bin),
_ => (),
}
}
}
async fn index(req: HttpRequest, stream: web::Payload) -> Result<HttpResponse, Error> {
let resp = ws::start(MyWs {}, &req, stream);
println!("{:?}", resp);
resp
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
HttpServer::new(|| App::new().route("/ws/", web::get().to(index)))
.bind("127.0.0.1:8080")?
.run()
.await
}
上面的代码启动了服务器,但是当我向它发送消息时,我收到了 Panic in Arbiter thread
。
text message received
from daemon: this print but then the websocket crashes!
thread 'actix-rt:worker:0' panicked at 'failed to park', /Users/xxx/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.25/src/runtime/basic_scheduler.rs:158:56
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Panic in Arbiter thread.
您的应用程序的问题是 actix-web 运行 时间(即 Tokio)是 multi-threaded。这是一个问题,因为 fork()
调用(由 daemon()
内部使用)仅复制调用 fork()
.
即使您的 parent 进程有 N 个线程,您的 child 进程也只有 1 个。如果您的 parent 进程有任何被这些线程锁定的互斥锁,它们的状态将是在 child 进程中复制,但由于这些线程不存在,它们将永远保持锁定状态。
如果你有一个 Rc
/Arc
它永远不会 de-allocate 它的内存,因为它永远不会被删除,因此它的内部计数永远不会达到零。这同样适用于任何指针和共享状态。
或者更简单地说 - 你的分叉 child 将以未定义状态结束。
这在 Calling fork() in a Multithreaded Environment 中得到了最好的解释:
The fork( ) system call creates an exact duplicate of the address space from which it is called, resulting in two address spaces executing the same code. Problems can occur if the forking address space has multiple threads executing at the time of the fork( ). When multithreading is a result of library invocation, threads are not necessarily aware of each other's presence, purpose, actions, and so on. Suppose that one of the other threads (any thread other than the one doing the fork( )) has the job of deducting money from your checking account. Clearly, you do not want this to happen twice as a result of some other thread's decision to call fork( ).
Because of these types of problems, which in general are problems of threads modifying persistent state, POSIX defined the behavior of fork( ) in the presence of threads to propagate only the forking thread. This solves the problem of improper changes being made to persistent state. However, it causes other problems, as discussed in the next paragraph.
In the POSIX model, only the forking thread is propagated. All the other threads are eliminated without any form of notice; no cancels are sent and no handlers are run. However, all the other portions of the address space are cloned, including all the mutex state. If the other thread has a mutex locked, the mutex will be locked in the child process, but the lock owner will not exist to unlock it. Therefore, the resource protected by the lock will be permanently unavailable.
在这里您可以找到 a more reputable source 以及更多详细信息
回答你的另一个问题:
"how can I launch a daemon inside my message handler?"
我假设您想实现经典的 unix "fork() on accept()" 模型。 在那种情况下,你就不走运了,因为 actix-web 和 async/await 等服务器 一般来说,在设计时并没有考虑到这一点。即使你有一个 single-threaded async/await 服务器,然后:
当 child 分叉时,它会继承 parent 的所有文件描述符。所以就是 common 在 fork 之后,child 关闭它的监听套接字以避免 资源泄漏 - 但在任何基于 async/await 的服务器上都无法做到这一点, 不是因为做不到,而是因为没有实现。
更重要的原因是为了防止 child 过程 从接受新连接 - 因为即使你 运行 一个单线程 服务器,它仍然能够同时处理许多任务——即 当你的处理程序调用
.await
时,接受者可以自由地 接受一个新连接(通过从套接字队列中窃取它)并开始处理它。您的 parent 服务器可能已经生成了很多任务,这些任务将是 在每个分叉 child 中复制,因此多次执行相同的事情, 在每个过程中独立
好吧......没有办法阻止任何 async/await 基于我熟悉的服务器。您需要一个自定义服务器:
- 如果它是 child 并且检测到它是 child,则检查它的接受器任务 它应该关闭侦听套接字并删除接受器。
- 它不应执行从 parent 派生的任何其他任务, 但没有办法实现。
换句话说 - async/await 和“fork() on accept()”是两个不同的 不兼容 个并发处理任务的模型。
一个可能的解决方案是有一个 non-async 接受器守护进程,它只 接受连接并分叉自身。然后在 child 中生成一个 web-server 然后将其送入接受的插座。但是尽管可能,none 的服务器 目前对此有支持。
如另一个答案中所述,如果您在子进程中触摸它,您所依赖的异步运行时可能会完全中断。触摸任何东西都可以完全打破 actix 或 tokio 开发者所做的假设。如果你从这个函数中 return 就会发生古怪的事情。
请参阅 this response by one of the key authors of tokio 某人做类似的事情(在 hyper 的线程池上下文中调用 fork()
):
Threads + fork is bad news... you can fork if you immediately exec and do not allocate memory or perform any other operation that may have been corrupted by the fork.
回到你的问题:
The objective is for my websocket to respond to messages and be able to launch isolated long-running processes that launch successfully and do not exit when the websocket exits.
我认为您根本不想手动 fork()
。 actix/tokio 提供的实用函数应该与其运行时很好地集成。您可以:
- 运行 阻塞或 CPU-专用线程中的大量代码
actix_web::block
- 使用
actix::AsyncContext::spawn
. You would ideally want to use e.g.tokio::process::Command
而不是std
版本生成未来以避免在异步上下文中阻塞。 - 如果您在子进程中所做的只是 运行
Command::new()
和后来的Command::spawn()
,我很确定您可以直接调用它。无需分叉; it does that internally.