如何将 postgres (deadpool-postgres) 与 WebSocket Actix (actix-web-actors) 一起使用

How use postgres (deadpool-postgres) with WebSocket Actix (actix-web-actors)

如何从 WebSocket 处理程序使用数据库连接。

可以使用 future.into_actorctx.spawnactix::fut::wrap_future 但这不准确:)

理论上,我不应该在向数据库发出请求时阻塞 ws 处理程序。我是否需要以某种方式将请求发送到单独线程中的某个执行程序?

未找到将数据库与 websockets 结合使用的任何信息或示例。

type PgPool = deadpool_r2d2::Pool<deadpool_postgres::Manager>;

pub struct MyWebSocket {
    db: deadpool::managed::Object<deadpool_postgres::Manager>    
}

impl MyWebSocket {
    pub fn new(client_db: deadpool::managed::Object<deadpool_postgres::Manager>) -> Self {
        Self { db:client_db }
    }
}

impl Actor for MyWebSocket {
    type Context = ws::WebsocketContext<Self>;
}

impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWebSocket {
    fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
        match msg {
         Ok(ws::Message::Text(text)) => {

             /*
               How used this database?
               Error: only allowed inside `async` functions and blocks**
             */

             let stmt = self.db.prepare_cached("SELECT 1 + ").await.unwrap();
             let rows = self.db.query(&stmt, &[&3]).await.unwrap();
             let value: i32 = rows[0].get(0);
             ctx.text(format!("{}",value));

         },
         ....
        }
    }
}
fn create_pool(max_size: usize) -> PgPool {
    let config:tokio_postgres::Config = config().expect("Error configure");
    let mgr_config = ManagerConfig {
        recycling_method: RecyclingMethod::Fast
    };
    let mgr:deadpool_postgres::Manager = Manager::from_config(config, NoTls, mgr_config);
    let pool:deadpool_r2d2::Pool<deadpool_postgres::Manager> = 
        Pool::builder(mgr).runtime(deadpool_postgres::Runtime::Tokio1).max_size(max_size).build().unwrap();
    pool
} 

#[get("ws/")]
async fn ws_index(req: HttpRequest, stream: web::Payload,db_pool: web::Data<PgPool>) -> Result<HttpResponse, Error> {
    let client:deadpool::managed::Object<deadpool_postgres::Manager> = db_pool.get().await.unwrap();
    let resp = ws::start(MyWebSocket::new(client), &req, stream);
    resp
}

#[actix_web::main]
async fn main() -> std::io::Result<()> { 
    let pool:PgPool = create_pool(2);
    HttpServer::new(move|| {
        App::new()
            .app_data(web::Data::new(pool.clone()))
            .wrap( middleware::DefaultHeaders::new().header("Access-Control-Allow-Origin", "*"))
            .service(ws_index)
            .wrap(middleware::Logger::default())
    })
    .workers(2)
    .bind(("0.0.0.0", 4011))?
    .run()
    .await
}

控制台:

error[E0728]: `await` is only allowed inside `async` functions and blocks
   --> src/test_db.rs:89:33
    |
67  | /     fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
68  | |         
69  | |         
70  | |         match msg {
...   |
89  | |                      let stmt = self.db.prepare_cached("SELECT 1 + ").await.unwrap();
    | |                                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ only allowed inside `async` functions and blocks
...   |
156 | |         }
157 | |     }
    | |_____- this is not `async`

error[E0728]: `await` is only allowed inside `async` functions and blocks
   --> src/test_db.rs:90:33
    |
67  | /     fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
68  | |         
69  | |         
70  | |         match msg {
...   |
90  | |                      let rows = self.db.query(&stmt, &[&3]).await.unwrap();
    | |                                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ only allowed inside `async` functions and blocks
...   |
156 | |         }
157 | |     }
    | |_____- this is not `async`

这是不可能的,但您可以使用 tokio spawn task method 并在闭包内使用 async await。

像这样。

tokio::spawn(async move {
    let stmt = self.db.prepare_cached("SELECT 1 + ").await.unwrap();
    let rows = self.db.query(&stmt, &[&3]).await.unwrap();
    let value: i32 = rows[0].get(0);
    ctx.text(format!("{}",value));
});

您应该可以使用 WebsocketContext::spawn

let db = self.db.clone();
let fut = async move {
   let stmt = db.prepare_cached("SELECT 1 + ").await.unwrap();
   let rows = db.query(&stmt, &[&3]).await.unwrap();
   let value: i32 = rows[0].get(0);
};
let fut = actix::fut::wrap_future::<_, Self>(fut);
ctx.spawn(fut);

关于评论中的问题:

How to send back the result of the database operation? Use Shared State or Channels?

您面临的问题是无法移动ctx。 一个简单的方法是从克隆的 ctx 中获取演员的 address,然后你可以移动它。 这种方法需要一个额外的结构和 Handler.

的实现
struct WsMessage(String)

impl Handler<WsMessage> for MyWebSocket {
    type Result = ();

    fn handle(&mut self, msg: WsMessage, ctx: &mut Context<Self>) -> Self::Result {
        ctx.text(msg.0);
    }
}

所以现在你可以做:

let db = self.db.clone();
let addr = ctx.addess();
let fut = async move {
   let stmt = db.prepare_cached("SELECT 1 + ").await.unwrap();
   let rows = db.query(&stmt, &[&3]).await.unwrap();
   let value: i32 = rows[0].get(0);
   addr.send(WsMessage(String::from(value))).await.unwrap();
};
let fut = actix::fut::wrap_future::<_, Self>(fut);
ctx.spawn(fut);

编辑

想想看,上面的做法有点过分了

看看 ActorFuture 你应该可以做到:

let db = self.db.clone();

let fut = async move {
   let stmt = db.prepare_cached("SELECT 1 + ").await.unwrap();
   let rows = db.query(&stmt, &[&3]).await.unwrap();
   let value: i32 = rows[0].get(0);
   value
};
let fut = actix::fut::wrap_future::<_, Self>(fut);
let fut = fut.map(|result, actor, ctx| {
   ctx.text(result.to_string());
});
ctx.spawn(fut)