使用 Actix 构建 WebSocket 客户端
Build a WebSocket client using Actix
我之前发布了一个关于如何Add awc websocket client to add_stream in actix, which focused on how to add a stream to the actor from the AWC Client. I have solved that issue的问题,但我仍然需要能够通过发送消息与服务器通信。
那么,让我们从一些背景开始。这是我的演员:
use actix_web_actors::ws::{Frame, ProtocolError};
use awc::BoxedSocket;
use awc::ws::Codec;
use futures::StreamExt;
use log::info;
use openssl::ssl::SslConnector;
pub struct RosClient {
pub address: String,
pub connection: Option<Framed<BoxedSocket, Codec>>,
pub hb: Instant,
}
impl RosClient {
pub fn new(address: &str) -> Self {
Self {
address: address.to_string(),
connection: None,
hb: Instant::now(),
}
}
}
impl Actor for RosClient {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
info!("Connecting to ROS client on {}", &self.address);
let ssl = {
let mut ssl = SslConnector::builder(openssl::ssl::SslMethod::tls()).unwrap();
let _ = ssl.set_alpn_protos(b"\x08http/1.1");
ssl.build()
};
let connector = awc::Connector::new().ssl(ssl).finish();
let ws = awc::ClientBuilder::new()
.connector(connector)
.finish()
.ws(&self.address)
.set_header("Host", "0.0.0.0:9090");
let _message = serde_json::json!({
"op": "subscribe",
"topic": "/client_count"
});
ws.connect()
.into_actor(self)
.map(|res, _act, ctx| match res {
Ok((client_response, frame)) => {
info!("Response: {:?}", client_response);
let (_r, w) = frame.split();
let _ = ctx.add_stream(w);
}
Err(err) => {
info!("Websocket Client Actor failed to connect: {:?}", err);
ctx.stop();
}
})
.wait(ctx);
}
fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
Running::Stop
}
}
impl StreamHandler<Result<Frame, ProtocolError>> for RosClient {
fn handle(&mut self, item: Result<Frame, ProtocolError>, _ctx: &mut Self::Context) {
match item.unwrap() {
Frame::Text(text_bytes) => {
let text = std::str::from_utf8(text_bytes.as_ref()).unwrap();
info!("Message: {}", text);
}
Frame::Binary(_) => {}
Frame::Continuation(_) => {}
Frame::Ping(_) => {
info!("Ping received!");
}
Frame::Pong(_) => {
self.hb = Instant::now();
}
Frame::Close(_) => {}
}
}
}
我如何保留对连接的引用(或句柄、副本或任何执行此工作的内容),以便在我实现消息处理程序时,我可以通过例如将数据发送到服务器,
Message::Text(message.to_string())
在调整了一些东西之后,我让它工作了。即使从您之前的问题来看,问题在于您如何处理连接创建。
板条箱 actix-web-actors
中有一个很好的参考,其中的模式类似于:
pub fn start_with_addr<A, T>(
actor: A,
req: &HttpRequest,
stream: T
) -> Result<(Addr<A>, HttpResponse), Error>
在你的情况下,这就是我想出的:
use actix::io::SinkWrite;
use actix::prelude::*;
use actix_codec::Framed;
use awc::{error::WsProtocolError, ws, BoxedSocket, Client};
use futures::stream::{SplitSink, SplitStream};
use futures_util::stream::StreamExt;
use log::{error, info};
use openssl::ssl::SslConnector;
type WsFramedSink = SplitSink<Framed<BoxedSocket, ws::Codec>, ws::Message>;
type WsFramedStream = SplitStream<Framed<BoxedSocket, ws::Codec>>;
struct RosClient {
sink: SinkWrite<ws::Message, WsFramedSink>,
}
impl RosClient {
pub fn start(sink: WsFramedSink, stream: WsFramedStream) -> Addr<Self> {
RosClient::create(|ctx| {
ctx.add_stream(stream);
RosClient {
sink: SinkWrite::new(sink, ctx),
}
})
}
}
impl Actor for RosClient {
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Context<Self>) {
info!("RosClient started");
}
}
impl actix::io::WriteHandler<WsProtocolError> for RosClient {}
#[derive(Message, Debug)]
#[rtype(result = "()")]
struct Event {
op: String,
topic: String,
}
impl Handler<Event> for RosClient {
type Result = ();
fn handle(&mut self, msg: Event, _ctx: &mut Self::Context) {
info!("Pushing Message {:?}", msg);
if let Some(error) = self
.sink
.write(ws::Message::Text(format!("{:?}", msg).into()))
{
error!("Error RosClient {:?}", error);
}
}
}
impl StreamHandler<Result<ws::Frame, WsProtocolError>> for RosClient {
fn handle(&mut self, item: Result<ws::Frame, WsProtocolError>, _ctx: &mut Self::Context) {
use ws::Frame;
match item.unwrap() {
Frame::Text(text_bytes) => {
let text = std::str::from_utf8(text_bytes.as_ref()).unwrap();
info!("Receiving Message: {}", text);
}
Frame::Binary(_) => {}
Frame::Continuation(_) => {}
Frame::Ping(_) => {
info!("Ping received!");
}
Frame::Pong(_) => {
//self.hb = Instant::now();
}
Frame::Close(_) => {}
}
}
}
#[actix_rt::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
std::env::set_var("RUST_LOG", "info");
env_logger::init();
let _ssl = {
let mut ssl = SslConnector::builder(openssl::ssl::SslMethod::tls()).unwrap();
let _ = ssl.set_alpn_protos(b"\x08http/1.1");
ssl.build()
};
//let connector = awc::Connector::new().ssl(ssl).finish();
let (_, framed) = Client::default()
.ws("http://localhost:8080")
.connect()
.await?;
let (sink, stream): (WsFramedSink, WsFramedStream) = framed.split();
let addr = RosClient::start(sink, stream);
let _res = addr
.send(Event {
op: format!("subscribe"),
topic: "/client_count".to_string(),
})
.await
.unwrap();
let _ = actix_rt::signal::ctrl_c().await?;
Ok(())
}
我写了一个简单的node.js服务器(没有ssl):
const { WebSocketServer } = require('ws');
const wss = new WebSocketServer({ port: 8080 });
wss.on('connection', function connection(ws) {
ws.on('message', function message(data) {
console.log('received: %s', data);
});
ws.send('something');
});
而且效果很好:
[2021-12-01T07:08:03Z INFO actix-wc-client] RosClient started
[2021-12-01T07:08:03Z INFO actix-wc-client] Pushing Message Event { op: "subscribe", topic: "/client_count" }
[2021-12-01T07:08:03Z INFO actix-wc-client] Receiving Message: something
您可能需要更新一些actix-* 版本。这是我的 Cargo.toml
文件:
[package]
name = "actix-wc-client"
version = "0.1.0"
edition = "2018"
[dependencies]
awc = "3.0.0-beta.9"
openssl = { version = "0.10" }
log = { version = "0.4" }
futures = "0.3"
actix = "0.11"
actix-web = "4.0.0-beta.10"
serde = "1"
serde_json = "1"
actix-codec = "0.4"
actix-rt = "2.5"
futures-util = "0.3"
actix-http = "3.0.0-beta.11"
env_logger = "0.7"
我之前发布了一个关于如何Add awc websocket client to add_stream in actix, which focused on how to add a stream to the actor from the AWC Client. I have solved that issue的问题,但我仍然需要能够通过发送消息与服务器通信。
那么,让我们从一些背景开始。这是我的演员:
use actix_web_actors::ws::{Frame, ProtocolError};
use awc::BoxedSocket;
use awc::ws::Codec;
use futures::StreamExt;
use log::info;
use openssl::ssl::SslConnector;
pub struct RosClient {
pub address: String,
pub connection: Option<Framed<BoxedSocket, Codec>>,
pub hb: Instant,
}
impl RosClient {
pub fn new(address: &str) -> Self {
Self {
address: address.to_string(),
connection: None,
hb: Instant::now(),
}
}
}
impl Actor for RosClient {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
info!("Connecting to ROS client on {}", &self.address);
let ssl = {
let mut ssl = SslConnector::builder(openssl::ssl::SslMethod::tls()).unwrap();
let _ = ssl.set_alpn_protos(b"\x08http/1.1");
ssl.build()
};
let connector = awc::Connector::new().ssl(ssl).finish();
let ws = awc::ClientBuilder::new()
.connector(connector)
.finish()
.ws(&self.address)
.set_header("Host", "0.0.0.0:9090");
let _message = serde_json::json!({
"op": "subscribe",
"topic": "/client_count"
});
ws.connect()
.into_actor(self)
.map(|res, _act, ctx| match res {
Ok((client_response, frame)) => {
info!("Response: {:?}", client_response);
let (_r, w) = frame.split();
let _ = ctx.add_stream(w);
}
Err(err) => {
info!("Websocket Client Actor failed to connect: {:?}", err);
ctx.stop();
}
})
.wait(ctx);
}
fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
Running::Stop
}
}
impl StreamHandler<Result<Frame, ProtocolError>> for RosClient {
fn handle(&mut self, item: Result<Frame, ProtocolError>, _ctx: &mut Self::Context) {
match item.unwrap() {
Frame::Text(text_bytes) => {
let text = std::str::from_utf8(text_bytes.as_ref()).unwrap();
info!("Message: {}", text);
}
Frame::Binary(_) => {}
Frame::Continuation(_) => {}
Frame::Ping(_) => {
info!("Ping received!");
}
Frame::Pong(_) => {
self.hb = Instant::now();
}
Frame::Close(_) => {}
}
}
}
我如何保留对连接的引用(或句柄、副本或任何执行此工作的内容),以便在我实现消息处理程序时,我可以通过例如将数据发送到服务器,
Message::Text(message.to_string())
在调整了一些东西之后,我让它工作了。即使从您之前的问题来看,问题在于您如何处理连接创建。
板条箱 actix-web-actors
中有一个很好的参考,其中的模式类似于:
pub fn start_with_addr<A, T>(
actor: A,
req: &HttpRequest,
stream: T
) -> Result<(Addr<A>, HttpResponse), Error>
在你的情况下,这就是我想出的:
use actix::io::SinkWrite;
use actix::prelude::*;
use actix_codec::Framed;
use awc::{error::WsProtocolError, ws, BoxedSocket, Client};
use futures::stream::{SplitSink, SplitStream};
use futures_util::stream::StreamExt;
use log::{error, info};
use openssl::ssl::SslConnector;
type WsFramedSink = SplitSink<Framed<BoxedSocket, ws::Codec>, ws::Message>;
type WsFramedStream = SplitStream<Framed<BoxedSocket, ws::Codec>>;
struct RosClient {
sink: SinkWrite<ws::Message, WsFramedSink>,
}
impl RosClient {
pub fn start(sink: WsFramedSink, stream: WsFramedStream) -> Addr<Self> {
RosClient::create(|ctx| {
ctx.add_stream(stream);
RosClient {
sink: SinkWrite::new(sink, ctx),
}
})
}
}
impl Actor for RosClient {
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Context<Self>) {
info!("RosClient started");
}
}
impl actix::io::WriteHandler<WsProtocolError> for RosClient {}
#[derive(Message, Debug)]
#[rtype(result = "()")]
struct Event {
op: String,
topic: String,
}
impl Handler<Event> for RosClient {
type Result = ();
fn handle(&mut self, msg: Event, _ctx: &mut Self::Context) {
info!("Pushing Message {:?}", msg);
if let Some(error) = self
.sink
.write(ws::Message::Text(format!("{:?}", msg).into()))
{
error!("Error RosClient {:?}", error);
}
}
}
impl StreamHandler<Result<ws::Frame, WsProtocolError>> for RosClient {
fn handle(&mut self, item: Result<ws::Frame, WsProtocolError>, _ctx: &mut Self::Context) {
use ws::Frame;
match item.unwrap() {
Frame::Text(text_bytes) => {
let text = std::str::from_utf8(text_bytes.as_ref()).unwrap();
info!("Receiving Message: {}", text);
}
Frame::Binary(_) => {}
Frame::Continuation(_) => {}
Frame::Ping(_) => {
info!("Ping received!");
}
Frame::Pong(_) => {
//self.hb = Instant::now();
}
Frame::Close(_) => {}
}
}
}
#[actix_rt::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
std::env::set_var("RUST_LOG", "info");
env_logger::init();
let _ssl = {
let mut ssl = SslConnector::builder(openssl::ssl::SslMethod::tls()).unwrap();
let _ = ssl.set_alpn_protos(b"\x08http/1.1");
ssl.build()
};
//let connector = awc::Connector::new().ssl(ssl).finish();
let (_, framed) = Client::default()
.ws("http://localhost:8080")
.connect()
.await?;
let (sink, stream): (WsFramedSink, WsFramedStream) = framed.split();
let addr = RosClient::start(sink, stream);
let _res = addr
.send(Event {
op: format!("subscribe"),
topic: "/client_count".to_string(),
})
.await
.unwrap();
let _ = actix_rt::signal::ctrl_c().await?;
Ok(())
}
我写了一个简单的node.js服务器(没有ssl):
const { WebSocketServer } = require('ws');
const wss = new WebSocketServer({ port: 8080 });
wss.on('connection', function connection(ws) {
ws.on('message', function message(data) {
console.log('received: %s', data);
});
ws.send('something');
});
而且效果很好:
[2021-12-01T07:08:03Z INFO actix-wc-client] RosClient started
[2021-12-01T07:08:03Z INFO actix-wc-client] Pushing Message Event { op: "subscribe", topic: "/client_count" }
[2021-12-01T07:08:03Z INFO actix-wc-client] Receiving Message: something
您可能需要更新一些actix-* 版本。这是我的 Cargo.toml
文件:
[package]
name = "actix-wc-client"
version = "0.1.0"
edition = "2018"
[dependencies]
awc = "3.0.0-beta.9"
openssl = { version = "0.10" }
log = { version = "0.4" }
futures = "0.3"
actix = "0.11"
actix-web = "4.0.0-beta.10"
serde = "1"
serde_json = "1"
actix-codec = "0.4"
actix-rt = "2.5"
futures-util = "0.3"
actix-http = "3.0.0-beta.11"
env_logger = "0.7"