使用 Actix 构建 WebSocket 客户端

Build a WebSocket client using Actix

我之前发布了一个关于如何Add awc websocket client to add_stream in actix, which focused on how to add a stream to the actor from the AWC Client. I have solved that issue的问题,但我仍然需要能够通过发送消息与服务器通信。

那么,让我们从一些背景开始。这是我的演员:

use actix_web_actors::ws::{Frame, ProtocolError};
use awc::BoxedSocket;
use awc::ws::Codec;
use futures::StreamExt;
use log::info;
use openssl::ssl::SslConnector;

pub struct RosClient {
    pub address: String,
    pub connection: Option<Framed<BoxedSocket, Codec>>,
    pub hb: Instant,
}

impl RosClient {
    pub fn new(address: &str) -> Self {
        Self {
            address: address.to_string(),
            connection: None,
            hb: Instant::now(),
        }
    }
}

impl Actor for RosClient {
    type Context = Context<Self>;

    fn started(&mut self, ctx: &mut Self::Context) {
        info!("Connecting to ROS client on {}", &self.address);
        let ssl = {
            let mut ssl = SslConnector::builder(openssl::ssl::SslMethod::tls()).unwrap();
            let _ = ssl.set_alpn_protos(b"\x08http/1.1");
            ssl.build()
        };
        let connector = awc::Connector::new().ssl(ssl).finish();
        let ws = awc::ClientBuilder::new()
            .connector(connector)
            .finish()
            .ws(&self.address)
            .set_header("Host", "0.0.0.0:9090");

        let _message = serde_json::json!({
            "op": "subscribe",
            "topic": "/client_count"
        });

        ws.connect()
            .into_actor(self)
            .map(|res, _act, ctx| match res {
                Ok((client_response, frame)) => {
                    info!("Response: {:?}", client_response);
                    let (_r, w) = frame.split();
                    let _ = ctx.add_stream(w);
                }
                Err(err) => {
                    info!("Websocket Client Actor failed to connect: {:?}", err);
                    ctx.stop();
                }
            })
            .wait(ctx);
    }

    fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
        Running::Stop
    }
}

impl StreamHandler<Result<Frame, ProtocolError>> for RosClient {
    fn handle(&mut self, item: Result<Frame, ProtocolError>, _ctx: &mut Self::Context) {
        match item.unwrap() {
            Frame::Text(text_bytes) => {
                let text = std::str::from_utf8(text_bytes.as_ref()).unwrap();
                info!("Message: {}", text);
            }
            Frame::Binary(_) => {}
            Frame::Continuation(_) => {}
            Frame::Ping(_) => {
                info!("Ping received!");
            }
            Frame::Pong(_) => {
                self.hb = Instant::now();
            }
            Frame::Close(_) => {}
        }
    }
}

我如何保留对连接的引用(或句柄、副本或任何执行此工作的内容),以便在我实现消息处理程序时,我可以通过例如将数据发送到服务器,

Message::Text(message.to_string()) 

在调整了一些东西之后,我让它工作了。即使从您之前的问题来看,问题在于您如何处理连接创建。 板条箱 actix-web-actors 中有一个很好的参考,其中的模式类似于:

pub fn start_with_addr<A, T>(
    actor: A, 
    req: &HttpRequest, 
    stream: T
) -> Result<(Addr<A>, HttpResponse), Error> 

在你的情况下,这就是我想出的:

use actix::io::SinkWrite;
use actix::prelude::*;
use actix_codec::Framed;
use awc::{error::WsProtocolError, ws, BoxedSocket, Client};
use futures::stream::{SplitSink, SplitStream};
use futures_util::stream::StreamExt;
use log::{error, info};
use openssl::ssl::SslConnector;

type WsFramedSink = SplitSink<Framed<BoxedSocket, ws::Codec>, ws::Message>;
type WsFramedStream = SplitStream<Framed<BoxedSocket, ws::Codec>>;
struct RosClient {
    sink: SinkWrite<ws::Message, WsFramedSink>,
}

impl RosClient {
    pub fn start(sink: WsFramedSink, stream: WsFramedStream) -> Addr<Self> {
        RosClient::create(|ctx| {
            ctx.add_stream(stream);
            RosClient {
                sink: SinkWrite::new(sink, ctx),
            }
        })
    }
}
impl Actor for RosClient {
    type Context = Context<Self>;

    fn started(&mut self, _ctx: &mut Context<Self>) {
        info!("RosClient started");
    }
}

impl actix::io::WriteHandler<WsProtocolError> for RosClient {}

#[derive(Message, Debug)]
#[rtype(result = "()")]
struct Event {
    op: String,
    topic: String,
}
impl Handler<Event> for RosClient {
    type Result = ();

    fn handle(&mut self, msg: Event, _ctx: &mut Self::Context) {
        info!("Pushing Message {:?}", msg);
        if let Some(error) = self
            .sink
            .write(ws::Message::Text(format!("{:?}", msg).into()))
        {
            error!("Error RosClient {:?}", error);
        }
    }
}

impl StreamHandler<Result<ws::Frame, WsProtocolError>> for RosClient {
    fn handle(&mut self, item: Result<ws::Frame, WsProtocolError>, _ctx: &mut Self::Context) {
        use ws::Frame;
        match item.unwrap() {
            Frame::Text(text_bytes) => {
                let text = std::str::from_utf8(text_bytes.as_ref()).unwrap();
                info!("Receiving Message: {}", text);
            }
            Frame::Binary(_) => {}
            Frame::Continuation(_) => {}
            Frame::Ping(_) => {
                info!("Ping received!");
            }
            Frame::Pong(_) => {
                //self.hb = Instant::now();
            }
            Frame::Close(_) => {}
        }
    }
}

#[actix_rt::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    std::env::set_var("RUST_LOG", "info");
    env_logger::init();
    let _ssl = {
        let mut ssl = SslConnector::builder(openssl::ssl::SslMethod::tls()).unwrap();
        let _ = ssl.set_alpn_protos(b"\x08http/1.1");
        ssl.build()
    };
    //let connector = awc::Connector::new().ssl(ssl).finish();
    let (_, framed) = Client::default()
        .ws("http://localhost:8080")
        .connect()
        .await?;
    let (sink, stream): (WsFramedSink, WsFramedStream) = framed.split();
    let addr = RosClient::start(sink, stream);

    let _res = addr
        .send(Event {
            op: format!("subscribe"),
            topic: "/client_count".to_string(),
        })
        .await
        .unwrap();
    let _ = actix_rt::signal::ctrl_c().await?;
    Ok(())
}

我写了一个简单的node.js服务器(没有ssl):

const { WebSocketServer } = require('ws');
const wss = new WebSocketServer({ port: 8080 });

wss.on('connection', function connection(ws) {
    ws.on('message', function message(data) {
        console.log('received: %s', data);
    });

    ws.send('something');
});

而且效果很好:

[2021-12-01T07:08:03Z INFO  actix-wc-client] RosClient started
[2021-12-01T07:08:03Z INFO  actix-wc-client] Pushing Message Event { op: "subscribe", topic: "/client_count" }
[2021-12-01T07:08:03Z INFO  actix-wc-client] Receiving Message: something

您可能需要更新一些actix-* 版本。这是我的 Cargo.toml 文件:

[package]
name = "actix-wc-client"
version = "0.1.0"
edition = "2018"

[dependencies]
awc = "3.0.0-beta.9"
openssl = { version = "0.10" }
log = { version = "0.4" }
futures = "0.3"
actix = "0.11"
actix-web = "4.0.0-beta.10"
serde = "1"
serde_json = "1"
actix-codec = "0.4"
actix-rt = "2.5"
futures-util = "0.3"
actix-http = "3.0.0-beta.11"
env_logger = "0.7"