如何将 tokio TcpStream 转换为 Sink/Stream 的 Serializable/Deserializable 值?

How to turn a tokio TcpStream into a Sink/Stream of Serializable/Deserializable values?

我有一个 tokio TcpStream. I want to pass some type T over this stream. This type T implement Serialize and Deserialize. How can I obtain a Sink<T> and a Stream<T>?

我找到了板条箱 tokio_util and tokio_serde,但我不知道如何使用它们来做我想做的事。

我不知道您的代码结构或您打算使用的编解码器,但我已经想出了如何将所有内容粘合到一个可行的示例中。

您的 Sink<T>Stream<Item=T> 将由 Framed type in tokio-serde. This layer deals with passing your messages through serde. This type takes four generic parameters: Transport, Item (the stream item), SinkItem, and Codec. Codec is a wrapper for the specific serializer and deserializer you want to use. You can view the provided options here. Item and SinkItem are just going to be your message type which must implement Serialize and Deserialize. Transport needs to be a Sink<SinkItem> and/or Stream<Item=Item> itself in order for the frame to implement any useful traits. This is where tokio-util comes in. It provides various Framed* types which allow you to convert things implementing AsyncRead/AsyncWrite into streams and sinks respectively. In order to construct these frames, you need to specify a codec which delimits frames from the wire. For simplicity in my example I just used the LengthDelimitedCodec 提供,但也提供其他选项。

废话少说,下面是一个示例,说明如何将 tokio::net::TcpStream 拆分为 Sink<T>Stream<Item=T>。请注意,T 是流端的结果,因为如果消息格式不正确,serde 层可能会失败。

use futures::{SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
use tokio::net::{
    tcp::{OwnedReadHalf, OwnedWriteHalf},
    TcpListener,
    TcpStream,
};
use tokio_serde::{formats::Json, Framed};
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};

#[derive(Serialize, Deserialize, Debug)]
struct MyMessage {
    field: String,
}

type WrappedStream = FramedRead<OwnedReadHalf, LengthDelimitedCodec>;
type WrappedSink = FramedWrite<OwnedWriteHalf, LengthDelimitedCodec>;

// We use the unit type in place of the message types since we're
// only dealing with one half of the IO
type SerStream = Framed<WrappedStream, MyMessage, (), Json<MyMessage, ()>>;
type DeSink = Framed<WrappedSink, (), MyMessage, Json<(), MyMessage>>;

fn wrap_stream(stream: TcpStream) -> (SerStream, DeSink) {
    let (read, write) = stream.into_split();
    let stream = WrappedStream::new(read, LengthDelimitedCodec::new());
    let sink = WrappedSink::new(write, LengthDelimitedCodec::new());
    (
        SerStream::new(stream, Json::default()),
        DeSink::new(sink, Json::default()),
    )
}

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("0.0.0.0:8080")
        .await
        .expect("Failed to bind server to addr");

    tokio::task::spawn(async move {
        let (stream, _) = listener
            .accept()
            .await
            .expect("Failed to accept incoming connection");
        
        let (mut stream, mut sink) = wrap_stream(stream);

        println!(
            "Server received: {:?}",
            stream
                .next()
                .await
                .expect("No data in stream")
                .expect("Failed to parse ping")
        );

        sink.send(MyMessage {
            field: "pong".to_owned(),
        })
            .await
            .expect("Failed to send pong");
    });

    let stream = TcpStream::connect("127.0.0.1:8080")
        .await
        .expect("Failed to connect to server");

    let (mut stream, mut sink) = wrap_stream(stream);

    sink.send(MyMessage {
        field: "ping".to_owned(),
    })
        .await
        .expect("Failed to send ping to server");
        
    println!(
        "Client received: {:?}",
        stream
            .next()
            .await
            .expect("No data in stream")
            .expect("Failed to parse pong")
    );
}

运行 这个例子产生:

Server received: MyMessage { field: "ping" }
Client received: MyMessage { field: "pong" }

请注意,拆分流不是必需的。您可以从 TcpStream 构造一个 tokio_util::codec::Framed,然后用 tokio_serde::formats::SymmetricalJson<MyMessage> 构造一个 tokio_serde::Framed,然后 Framed 将实现 SinkStream 相应地。此示例中的许多功能也是 feature-gated,因此请务必根据文档启用适当的功能。