UdpFramed 与 Actix Rust。无法使用 SinkWrite 发送消息

UdpFramed with Actix Rust. Can't send messages using SinkWrite

我正在尝试使用 Actix 编写 Udp 客户端 Actor。我遵循了这个示例,UDP-Echo,但我似乎无法使用 UdpFramed tokio 结构向服务器发送消息。 这是我目前所拥有的,这是 Udp Client Actor 实现

use std::collections::HashMap;
use std::net::{SocketAddr};
use actix_rt::net::UdpSocket;
use actix::{Actor, Addr, AsyncContext, Context, Handler, StreamHandler, Message};
use actix::io::SinkWrite;
use actix_web::web::{Bytes, BytesMut};
use futures_util::stream::{SplitSink};
use futures_util::StreamExt;
use log::info;
use serde_json::Value;
use tokio_util::codec::BytesCodec;
use tokio_util::udp::UdpFramed;
use crate::rosclient::messages::Subscribe;
use std::io::Result;
mod messages;



type SinkItem = (Bytes, SocketAddr);
type UdpSink = SplitSink<UdpFramed<BytesCodec, UdpSocket>, SinkItem>;

pub struct UdpClientActor {
    pub address: SocketAddr,
    pub sink: SinkWrite<SinkItem, UdpSink>,
}

impl UdpClientActor {
    pub fn start(udp: UdpSocket, address: SocketAddr) -> Addr<UdpClientActor> {

        let framed = UdpFramed::new(udp, BytesCodec::new());

        let (split_sink, split_stream) = framed.split();

        UdpClientActor::create(|ctx| {
            ctx.add_stream(split_stream.filter_map(
                |item: Result<(BytesMut, SocketAddr)>| async {
                    item.map(|(data, sender)| UdpPacket(data, sender)).ok()
                },
            ));

            UdpClientActor {
                address,
                sink: SinkWrite::new(split_sink, ctx),
            }
        })
    }

}

impl Actor for UdpClientActor {
    type Context = Context<Self>;

    fn started(&mut self, ctx: &mut Self::Context) {
        let mut hashmap = HashMap::new();
        hashmap.insert(String::from("topic"), Value::String(String::from("/client_count")));
        let subscription = Subscribe {
            id: Default::default(),
            op: "subscribe".to_string(),

            extra: hashmap
        };
        ctx.notify(subscription);
    }
}

#[derive(Message)]
#[rtype(result = "()")]
struct UdpPacket(BytesMut, SocketAddr);

impl StreamHandler<UdpPacket> for
UdpClientActor {
    fn handle(&mut self, item: UdpPacket, _ctx: &mut Self::Context) {
        println!("Received: ({:?}, {:?})", item.0, item.1);
        self.sink.write((item.0.into(), item.1)).unwrap();
    }
}

impl actix::io::WriteHandler<std::io::Error> for UdpClientActor {}

impl Handler<Subscribe> for UdpClientActor {
    type Result = ();

    fn handle(&mut self, msg: Subscribe, _ctx: &mut Self::Context) -> Self::Result {
        let js = serde_json::json!(msg).to_string();
        let _ = self.sink.write((Bytes::from(msg.to_string()), self.address));

        info!("Subscribing to topic {}", js);
    }
}

我的主要功能创建 udp 套接字并生成 actor。

fn main() {
    ////////////////////////////////////////////////////////////////////////////
   
    let fut = async {

        ////////////////////////////////////////////////////////////////////////////
        /////////// UDP_ACTOR
        let sock = tokio::net::UdpSocket::bind("0.0.0.0:9091").await.unwrap();
        let remote_addr = "172.30.89.169:9091".parse::<SocketAddr>().unwrap();
        // let message = b"{ \"op\": \"subscribe\", \"topic\": \"/client_count\"}";
        let _ = sock.connect(remote_addr).await;
        // sock.send(message).await.unwrap();

        let _udp_client = UdpClientActor::start(sock, remote_addr);
    };
    actix_rt::Arbiter::new().spawn(fut);
    // system.block_on(fut);
    system.run().unwrap();
}

如果我删除

上的评论
let message = b"{ \"op\": \"subscribe\", \"topic\": \"/client_count\"}";

sock.send(message).await.unwrap();

我至少可以检查服务器是否真的可以接收消息。所以我知道问题一定出在我对actor的实现上。我确实有另一个使用 LinesCodec 而不是 BytesCodec 的代码,它遵循完全相同的实现。唯一不同的是 SinkWrite 变成了这样:

SinkWrite<(String, SocketAddr), SplitSink<UdpFramed<codec::LinesCodec>,
        (String, SocketAddr)>>

这是我的Cargo.toml供参考。

[package]
name = "local_websocket_client"
version = "0.1.0"
edition = "2018"

[dependencies]
actix="0.12"
actix-codec = "0.4"
actix-rt = "2.5"
bytestring = "1.0"
serde = {version="1.0", features=["serde_derive"]}
log = "0.4"
env_logger = "0.9.0"
chrono = "0.4"
dashmap = "4.0"
futures = "0.3"
openssl = "0.10"
tokio = { version = "1", features = ["full"] }
actix-web = "4.0.0-beta.15"
futures-util = "0.3"
tokio-util = { version="0.6", features=["net", "codec"] }
tokio-udp = "0.1.6"
bytes= { version="0.6", features=["serde"] }
[dependencies.awc]
features = ["openssl"]
version = "3.0.0-beta.9"

[dependencies.serde_json]
features = ["default"]
version = "1.0"

[dependencies.uuid]
features = ["v4", "serde", "v5"]
version = "0.8"

那里有一些额外的板条箱,因为我 运行 2 个其他 websocket 客户端在同一个应用程序上。

我真的很感激能在这件事上提供一些帮助。 谢谢

通过将 UdpSocket 包装在 Arc 中并将引用保留在 actor 中供以后使用来解决。使用套接字写消息是可行的。用于流处理程序的拆分流无需更改,因为它按预期工作。