如何将 postgres (deadpool-postgres) 与 WebSocket Actix (actix-web-actors) 一起使用
How use postgres (deadpool-postgres) with WebSocket Actix (actix-web-actors)
如何从 WebSocket 处理程序使用数据库连接。
可以使用 future.into_actor
或 ctx.spawn
或 actix::fut::wrap_future
但这不准确:)
理论上,我不应该在向数据库发出请求时阻塞 ws 处理程序。我是否需要以某种方式将请求发送到单独线程中的某个执行程序?
未找到将数据库与 websockets 结合使用的任何信息或示例。
type PgPool = deadpool_r2d2::Pool<deadpool_postgres::Manager>;
pub struct MyWebSocket {
db: deadpool::managed::Object<deadpool_postgres::Manager>
}
impl MyWebSocket {
pub fn new(client_db: deadpool::managed::Object<deadpool_postgres::Manager>) -> Self {
Self { db:client_db }
}
}
impl Actor for MyWebSocket {
type Context = ws::WebsocketContext<Self>;
}
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWebSocket {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
match msg {
Ok(ws::Message::Text(text)) => {
/*
How used this database?
Error: only allowed inside `async` functions and blocks**
*/
let stmt = self.db.prepare_cached("SELECT 1 + ").await.unwrap();
let rows = self.db.query(&stmt, &[&3]).await.unwrap();
let value: i32 = rows[0].get(0);
ctx.text(format!("{}",value));
},
....
}
}
}
fn create_pool(max_size: usize) -> PgPool {
let config:tokio_postgres::Config = config().expect("Error configure");
let mgr_config = ManagerConfig {
recycling_method: RecyclingMethod::Fast
};
let mgr:deadpool_postgres::Manager = Manager::from_config(config, NoTls, mgr_config);
let pool:deadpool_r2d2::Pool<deadpool_postgres::Manager> =
Pool::builder(mgr).runtime(deadpool_postgres::Runtime::Tokio1).max_size(max_size).build().unwrap();
pool
}
#[get("ws/")]
async fn ws_index(req: HttpRequest, stream: web::Payload,db_pool: web::Data<PgPool>) -> Result<HttpResponse, Error> {
let client:deadpool::managed::Object<deadpool_postgres::Manager> = db_pool.get().await.unwrap();
let resp = ws::start(MyWebSocket::new(client), &req, stream);
resp
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let pool:PgPool = create_pool(2);
HttpServer::new(move|| {
App::new()
.app_data(web::Data::new(pool.clone()))
.wrap( middleware::DefaultHeaders::new().header("Access-Control-Allow-Origin", "*"))
.service(ws_index)
.wrap(middleware::Logger::default())
})
.workers(2)
.bind(("0.0.0.0", 4011))?
.run()
.await
}
控制台:
error[E0728]: `await` is only allowed inside `async` functions and blocks
--> src/test_db.rs:89:33
|
67 | / fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
68 | |
69 | |
70 | | match msg {
... |
89 | | let stmt = self.db.prepare_cached("SELECT 1 + ").await.unwrap();
| | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ only allowed inside `async` functions and blocks
... |
156 | | }
157 | | }
| |_____- this is not `async`
error[E0728]: `await` is only allowed inside `async` functions and blocks
--> src/test_db.rs:90:33
|
67 | / fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
68 | |
69 | |
70 | | match msg {
... |
90 | | let rows = self.db.query(&stmt, &[&3]).await.unwrap();
| | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ only allowed inside `async` functions and blocks
... |
156 | | }
157 | | }
| |_____- this is not `async`
这是不可能的,但您可以使用 tokio spawn task method 并在闭包内使用 async await。
像这样。
tokio::spawn(async move {
let stmt = self.db.prepare_cached("SELECT 1 + ").await.unwrap();
let rows = self.db.query(&stmt, &[&3]).await.unwrap();
let value: i32 = rows[0].get(0);
ctx.text(format!("{}",value));
});
您应该可以使用 WebsocketContext::spawn
let db = self.db.clone();
let fut = async move {
let stmt = db.prepare_cached("SELECT 1 + ").await.unwrap();
let rows = db.query(&stmt, &[&3]).await.unwrap();
let value: i32 = rows[0].get(0);
};
let fut = actix::fut::wrap_future::<_, Self>(fut);
ctx.spawn(fut);
关于评论中的问题:
How to send back the result of the database operation? Use Shared
State or Channels?
您面临的问题是无法移动ctx
。
一个简单的方法是从克隆的 ctx
中获取演员的 address,然后你可以移动它。
这种方法需要一个额外的结构和 Handler.
的实现
struct WsMessage(String)
impl Handler<WsMessage> for MyWebSocket {
type Result = ();
fn handle(&mut self, msg: WsMessage, ctx: &mut Context<Self>) -> Self::Result {
ctx.text(msg.0);
}
}
所以现在你可以做:
let db = self.db.clone();
let addr = ctx.addess();
let fut = async move {
let stmt = db.prepare_cached("SELECT 1 + ").await.unwrap();
let rows = db.query(&stmt, &[&3]).await.unwrap();
let value: i32 = rows[0].get(0);
addr.send(WsMessage(String::from(value))).await.unwrap();
};
let fut = actix::fut::wrap_future::<_, Self>(fut);
ctx.spawn(fut);
编辑
想想看,上面的做法有点过分了
看看 ActorFuture 你应该可以做到:
let db = self.db.clone();
let fut = async move {
let stmt = db.prepare_cached("SELECT 1 + ").await.unwrap();
let rows = db.query(&stmt, &[&3]).await.unwrap();
let value: i32 = rows[0].get(0);
value
};
let fut = actix::fut::wrap_future::<_, Self>(fut);
let fut = fut.map(|result, actor, ctx| {
ctx.text(result.to_string());
});
ctx.spawn(fut)
如何从 WebSocket 处理程序使用数据库连接。
可以使用 future.into_actor
或 ctx.spawn
或 actix::fut::wrap_future
但这不准确:)
理论上,我不应该在向数据库发出请求时阻塞 ws 处理程序。我是否需要以某种方式将请求发送到单独线程中的某个执行程序?
未找到将数据库与 websockets 结合使用的任何信息或示例。
type PgPool = deadpool_r2d2::Pool<deadpool_postgres::Manager>;
pub struct MyWebSocket {
db: deadpool::managed::Object<deadpool_postgres::Manager>
}
impl MyWebSocket {
pub fn new(client_db: deadpool::managed::Object<deadpool_postgres::Manager>) -> Self {
Self { db:client_db }
}
}
impl Actor for MyWebSocket {
type Context = ws::WebsocketContext<Self>;
}
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWebSocket {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
match msg {
Ok(ws::Message::Text(text)) => {
/*
How used this database?
Error: only allowed inside `async` functions and blocks**
*/
let stmt = self.db.prepare_cached("SELECT 1 + ").await.unwrap();
let rows = self.db.query(&stmt, &[&3]).await.unwrap();
let value: i32 = rows[0].get(0);
ctx.text(format!("{}",value));
},
....
}
}
}
fn create_pool(max_size: usize) -> PgPool {
let config:tokio_postgres::Config = config().expect("Error configure");
let mgr_config = ManagerConfig {
recycling_method: RecyclingMethod::Fast
};
let mgr:deadpool_postgres::Manager = Manager::from_config(config, NoTls, mgr_config);
let pool:deadpool_r2d2::Pool<deadpool_postgres::Manager> =
Pool::builder(mgr).runtime(deadpool_postgres::Runtime::Tokio1).max_size(max_size).build().unwrap();
pool
}
#[get("ws/")]
async fn ws_index(req: HttpRequest, stream: web::Payload,db_pool: web::Data<PgPool>) -> Result<HttpResponse, Error> {
let client:deadpool::managed::Object<deadpool_postgres::Manager> = db_pool.get().await.unwrap();
let resp = ws::start(MyWebSocket::new(client), &req, stream);
resp
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let pool:PgPool = create_pool(2);
HttpServer::new(move|| {
App::new()
.app_data(web::Data::new(pool.clone()))
.wrap( middleware::DefaultHeaders::new().header("Access-Control-Allow-Origin", "*"))
.service(ws_index)
.wrap(middleware::Logger::default())
})
.workers(2)
.bind(("0.0.0.0", 4011))?
.run()
.await
}
控制台:
error[E0728]: `await` is only allowed inside `async` functions and blocks
--> src/test_db.rs:89:33
|
67 | / fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
68 | |
69 | |
70 | | match msg {
... |
89 | | let stmt = self.db.prepare_cached("SELECT 1 + ").await.unwrap();
| | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ only allowed inside `async` functions and blocks
... |
156 | | }
157 | | }
| |_____- this is not `async`
error[E0728]: `await` is only allowed inside `async` functions and blocks
--> src/test_db.rs:90:33
|
67 | / fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
68 | |
69 | |
70 | | match msg {
... |
90 | | let rows = self.db.query(&stmt, &[&3]).await.unwrap();
| | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ only allowed inside `async` functions and blocks
... |
156 | | }
157 | | }
| |_____- this is not `async`
这是不可能的,但您可以使用 tokio spawn task method 并在闭包内使用 async await。
像这样。
tokio::spawn(async move {
let stmt = self.db.prepare_cached("SELECT 1 + ").await.unwrap();
let rows = self.db.query(&stmt, &[&3]).await.unwrap();
let value: i32 = rows[0].get(0);
ctx.text(format!("{}",value));
});
您应该可以使用 WebsocketContext::spawn
let db = self.db.clone();
let fut = async move {
let stmt = db.prepare_cached("SELECT 1 + ").await.unwrap();
let rows = db.query(&stmt, &[&3]).await.unwrap();
let value: i32 = rows[0].get(0);
};
let fut = actix::fut::wrap_future::<_, Self>(fut);
ctx.spawn(fut);
关于评论中的问题:
How to send back the result of the database operation? Use Shared State or Channels?
您面临的问题是无法移动ctx
。
一个简单的方法是从克隆的 ctx
中获取演员的 address,然后你可以移动它。
这种方法需要一个额外的结构和 Handler.
struct WsMessage(String)
impl Handler<WsMessage> for MyWebSocket {
type Result = ();
fn handle(&mut self, msg: WsMessage, ctx: &mut Context<Self>) -> Self::Result {
ctx.text(msg.0);
}
}
所以现在你可以做:
let db = self.db.clone();
let addr = ctx.addess();
let fut = async move {
let stmt = db.prepare_cached("SELECT 1 + ").await.unwrap();
let rows = db.query(&stmt, &[&3]).await.unwrap();
let value: i32 = rows[0].get(0);
addr.send(WsMessage(String::from(value))).await.unwrap();
};
let fut = actix::fut::wrap_future::<_, Self>(fut);
ctx.spawn(fut);
编辑
想想看,上面的做法有点过分了
看看 ActorFuture 你应该可以做到:
let db = self.db.clone();
let fut = async move {
let stmt = db.prepare_cached("SELECT 1 + ").await.unwrap();
let rows = db.query(&stmt, &[&3]).await.unwrap();
let value: i32 = rows[0].get(0);
value
};
let fut = actix::fut::wrap_future::<_, Self>(fut);
let fut = fut.map(|result, actor, ctx| {
ctx.text(result.to_string());
});
ctx.spawn(fut)