UdpFramed 与 Actix Rust。无法使用 SinkWrite 发送消息
UdpFramed with Actix Rust. Can't send messages using SinkWrite
我正在尝试使用 Actix 编写 Udp 客户端 Actor。我遵循了这个示例,UDP-Echo,但我似乎无法使用 UdpFramed tokio 结构向服务器发送消息。
这是我目前所拥有的,这是 Udp Client Actor 实现
use std::collections::HashMap;
use std::net::{SocketAddr};
use actix_rt::net::UdpSocket;
use actix::{Actor, Addr, AsyncContext, Context, Handler, StreamHandler, Message};
use actix::io::SinkWrite;
use actix_web::web::{Bytes, BytesMut};
use futures_util::stream::{SplitSink};
use futures_util::StreamExt;
use log::info;
use serde_json::Value;
use tokio_util::codec::BytesCodec;
use tokio_util::udp::UdpFramed;
use crate::rosclient::messages::Subscribe;
use std::io::Result;
mod messages;
type SinkItem = (Bytes, SocketAddr);
type UdpSink = SplitSink<UdpFramed<BytesCodec, UdpSocket>, SinkItem>;
pub struct UdpClientActor {
pub address: SocketAddr,
pub sink: SinkWrite<SinkItem, UdpSink>,
}
impl UdpClientActor {
pub fn start(udp: UdpSocket, address: SocketAddr) -> Addr<UdpClientActor> {
let framed = UdpFramed::new(udp, BytesCodec::new());
let (split_sink, split_stream) = framed.split();
UdpClientActor::create(|ctx| {
ctx.add_stream(split_stream.filter_map(
|item: Result<(BytesMut, SocketAddr)>| async {
item.map(|(data, sender)| UdpPacket(data, sender)).ok()
},
));
UdpClientActor {
address,
sink: SinkWrite::new(split_sink, ctx),
}
})
}
}
impl Actor for UdpClientActor {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
let mut hashmap = HashMap::new();
hashmap.insert(String::from("topic"), Value::String(String::from("/client_count")));
let subscription = Subscribe {
id: Default::default(),
op: "subscribe".to_string(),
extra: hashmap
};
ctx.notify(subscription);
}
}
#[derive(Message)]
#[rtype(result = "()")]
struct UdpPacket(BytesMut, SocketAddr);
impl StreamHandler<UdpPacket> for
UdpClientActor {
fn handle(&mut self, item: UdpPacket, _ctx: &mut Self::Context) {
println!("Received: ({:?}, {:?})", item.0, item.1);
self.sink.write((item.0.into(), item.1)).unwrap();
}
}
impl actix::io::WriteHandler<std::io::Error> for UdpClientActor {}
impl Handler<Subscribe> for UdpClientActor {
type Result = ();
fn handle(&mut self, msg: Subscribe, _ctx: &mut Self::Context) -> Self::Result {
let js = serde_json::json!(msg).to_string();
let _ = self.sink.write((Bytes::from(msg.to_string()), self.address));
info!("Subscribing to topic {}", js);
}
}
我的主要功能创建 udp 套接字并生成 actor。
fn main() {
////////////////////////////////////////////////////////////////////////////
let fut = async {
////////////////////////////////////////////////////////////////////////////
/////////// UDP_ACTOR
let sock = tokio::net::UdpSocket::bind("0.0.0.0:9091").await.unwrap();
let remote_addr = "172.30.89.169:9091".parse::<SocketAddr>().unwrap();
// let message = b"{ \"op\": \"subscribe\", \"topic\": \"/client_count\"}";
let _ = sock.connect(remote_addr).await;
// sock.send(message).await.unwrap();
let _udp_client = UdpClientActor::start(sock, remote_addr);
};
actix_rt::Arbiter::new().spawn(fut);
// system.block_on(fut);
system.run().unwrap();
}
如果我删除
上的评论
let message = b"{ \"op\": \"subscribe\", \"topic\": \"/client_count\"}";
和
sock.send(message).await.unwrap();
我至少可以检查服务器是否真的可以接收消息。所以我知道问题一定出在我对actor的实现上。我确实有另一个使用 LinesCodec 而不是 BytesCodec 的代码,它遵循完全相同的实现。唯一不同的是 SinkWrite 变成了这样:
SinkWrite<(String, SocketAddr), SplitSink<UdpFramed<codec::LinesCodec>,
(String, SocketAddr)>>
这是我的Cargo.toml供参考。
[package]
name = "local_websocket_client"
version = "0.1.0"
edition = "2018"
[dependencies]
actix="0.12"
actix-codec = "0.4"
actix-rt = "2.5"
bytestring = "1.0"
serde = {version="1.0", features=["serde_derive"]}
log = "0.4"
env_logger = "0.9.0"
chrono = "0.4"
dashmap = "4.0"
futures = "0.3"
openssl = "0.10"
tokio = { version = "1", features = ["full"] }
actix-web = "4.0.0-beta.15"
futures-util = "0.3"
tokio-util = { version="0.6", features=["net", "codec"] }
tokio-udp = "0.1.6"
bytes= { version="0.6", features=["serde"] }
[dependencies.awc]
features = ["openssl"]
version = "3.0.0-beta.9"
[dependencies.serde_json]
features = ["default"]
version = "1.0"
[dependencies.uuid]
features = ["v4", "serde", "v5"]
version = "0.8"
那里有一些额外的板条箱,因为我 运行 2 个其他 websocket 客户端在同一个应用程序上。
我真的很感激能在这件事上提供一些帮助。
谢谢
通过将 UdpSocket 包装在 Arc 中并将引用保留在 actor 中供以后使用来解决。使用套接字写消息是可行的。用于流处理程序的拆分流无需更改,因为它按预期工作。
我正在尝试使用 Actix 编写 Udp 客户端 Actor。我遵循了这个示例,UDP-Echo,但我似乎无法使用 UdpFramed tokio 结构向服务器发送消息。 这是我目前所拥有的,这是 Udp Client Actor 实现
use std::collections::HashMap;
use std::net::{SocketAddr};
use actix_rt::net::UdpSocket;
use actix::{Actor, Addr, AsyncContext, Context, Handler, StreamHandler, Message};
use actix::io::SinkWrite;
use actix_web::web::{Bytes, BytesMut};
use futures_util::stream::{SplitSink};
use futures_util::StreamExt;
use log::info;
use serde_json::Value;
use tokio_util::codec::BytesCodec;
use tokio_util::udp::UdpFramed;
use crate::rosclient::messages::Subscribe;
use std::io::Result;
mod messages;
type SinkItem = (Bytes, SocketAddr);
type UdpSink = SplitSink<UdpFramed<BytesCodec, UdpSocket>, SinkItem>;
pub struct UdpClientActor {
pub address: SocketAddr,
pub sink: SinkWrite<SinkItem, UdpSink>,
}
impl UdpClientActor {
pub fn start(udp: UdpSocket, address: SocketAddr) -> Addr<UdpClientActor> {
let framed = UdpFramed::new(udp, BytesCodec::new());
let (split_sink, split_stream) = framed.split();
UdpClientActor::create(|ctx| {
ctx.add_stream(split_stream.filter_map(
|item: Result<(BytesMut, SocketAddr)>| async {
item.map(|(data, sender)| UdpPacket(data, sender)).ok()
},
));
UdpClientActor {
address,
sink: SinkWrite::new(split_sink, ctx),
}
})
}
}
impl Actor for UdpClientActor {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
let mut hashmap = HashMap::new();
hashmap.insert(String::from("topic"), Value::String(String::from("/client_count")));
let subscription = Subscribe {
id: Default::default(),
op: "subscribe".to_string(),
extra: hashmap
};
ctx.notify(subscription);
}
}
#[derive(Message)]
#[rtype(result = "()")]
struct UdpPacket(BytesMut, SocketAddr);
impl StreamHandler<UdpPacket> for
UdpClientActor {
fn handle(&mut self, item: UdpPacket, _ctx: &mut Self::Context) {
println!("Received: ({:?}, {:?})", item.0, item.1);
self.sink.write((item.0.into(), item.1)).unwrap();
}
}
impl actix::io::WriteHandler<std::io::Error> for UdpClientActor {}
impl Handler<Subscribe> for UdpClientActor {
type Result = ();
fn handle(&mut self, msg: Subscribe, _ctx: &mut Self::Context) -> Self::Result {
let js = serde_json::json!(msg).to_string();
let _ = self.sink.write((Bytes::from(msg.to_string()), self.address));
info!("Subscribing to topic {}", js);
}
}
我的主要功能创建 udp 套接字并生成 actor。
fn main() {
////////////////////////////////////////////////////////////////////////////
let fut = async {
////////////////////////////////////////////////////////////////////////////
/////////// UDP_ACTOR
let sock = tokio::net::UdpSocket::bind("0.0.0.0:9091").await.unwrap();
let remote_addr = "172.30.89.169:9091".parse::<SocketAddr>().unwrap();
// let message = b"{ \"op\": \"subscribe\", \"topic\": \"/client_count\"}";
let _ = sock.connect(remote_addr).await;
// sock.send(message).await.unwrap();
let _udp_client = UdpClientActor::start(sock, remote_addr);
};
actix_rt::Arbiter::new().spawn(fut);
// system.block_on(fut);
system.run().unwrap();
}
如果我删除
上的评论let message = b"{ \"op\": \"subscribe\", \"topic\": \"/client_count\"}";
和
sock.send(message).await.unwrap();
我至少可以检查服务器是否真的可以接收消息。所以我知道问题一定出在我对actor的实现上。我确实有另一个使用 LinesCodec 而不是 BytesCodec 的代码,它遵循完全相同的实现。唯一不同的是 SinkWrite 变成了这样:
SinkWrite<(String, SocketAddr), SplitSink<UdpFramed<codec::LinesCodec>,
(String, SocketAddr)>>
这是我的Cargo.toml供参考。
[package]
name = "local_websocket_client"
version = "0.1.0"
edition = "2018"
[dependencies]
actix="0.12"
actix-codec = "0.4"
actix-rt = "2.5"
bytestring = "1.0"
serde = {version="1.0", features=["serde_derive"]}
log = "0.4"
env_logger = "0.9.0"
chrono = "0.4"
dashmap = "4.0"
futures = "0.3"
openssl = "0.10"
tokio = { version = "1", features = ["full"] }
actix-web = "4.0.0-beta.15"
futures-util = "0.3"
tokio-util = { version="0.6", features=["net", "codec"] }
tokio-udp = "0.1.6"
bytes= { version="0.6", features=["serde"] }
[dependencies.awc]
features = ["openssl"]
version = "3.0.0-beta.9"
[dependencies.serde_json]
features = ["default"]
version = "1.0"
[dependencies.uuid]
features = ["v4", "serde", "v5"]
version = "0.8"
那里有一些额外的板条箱,因为我 运行 2 个其他 websocket 客户端在同一个应用程序上。
我真的很感激能在这件事上提供一些帮助。 谢谢
通过将 UdpSocket 包装在 Arc 中并将引用保留在 actor 中供以后使用来解决。使用套接字写消息是可行的。用于流处理程序的拆分流无需更改,因为它按预期工作。