如何通过futures:stream::Stream发送bytes::bytes::Bytes?

How to send bytes::bytes::Bytes by futures:stream::Stream?

我正在尝试编写基于 Tokio's example 的 TCP 服务器。

当我尝试发送缓冲区时,编译器 returns 错误 0277。

我的代码:(playground)

extern crate tokio; // 0.1.22

use tokio::codec::{BytesCodec, Decoder};
use tokio::net::TcpListener;
use tokio::prelude::*;

use bytes::Bytes; // 0.4.12

fn main() {
    let addr = "0.0.0.0:1502".parse().unwrap();
    let mut socket = TcpListener::bind(&addr).unwrap();

    let done = socket
        .incoming()
        .map_err(|e| println!("failed to accept socket; error = {:?}", e))
        .for_each(move |socket| {
            let framed = BytesCodec::new().framed(socket);
            let (writer, reader) = framed.split();

            let processor = reader
                .for_each(move |bytes| {
                    println!("bytes: {:?}", bytes);
                    let mut data_to_send = [0 as u8; 1024];
                    let buf = Bytes::from(&data_to_send[..1024]);
                    writer.send_all(&mut buf);
                    Ok(())
                })
                .and_then(|()| Ok(()))
                .or_else(|err| Err(err))
                .then(|result| Ok(()));
            tokio::spawn(processor)
        });
    tokio::run(done);
}
error[E0277]: the trait bound `bytes::bytes::Bytes: tokio::prelude::Stream` is not satisfied
  --> src/main.rs:25:28
   |
25 |                     writer.send_all(&mut buf); // ERROR : E0277
   |                            ^^^^^^^^ the trait `tokio::prelude::Stream` is not implemented for `bytes::bytes::Bytes`
   |
   = note: required because of the requirements on the impl of `tokio::prelude::Stream` for `&mut bytes::bytes::Bytes`

我解决了这个问题。

Playground

extern crate tokio; // 0.1.22

use tokio::codec::{BytesCodec, Decoder};
use tokio::net::TcpListener;
use tokio::prelude::*;

use bytes::Bytes; // 0.4.12

fn main() {
    let addr = "0.0.0.0:1502".parse().unwrap();
    let mut socket = TcpListener::bind(&addr).unwrap();

    let done = socket
        .incoming()
        .map_err(|e| println!("failed to accept socket; error = {:?}", e))
        .for_each(move |socket| {
            let framed = BytesCodec::new().framed(socket);
            let (mut writer, reader) = framed.split();

            let processor = reader
                .for_each(move |bytes| {
                    println!("bytes: {:?}", bytes);
                    let mut data_to_send = [0 as u8; 1024];
                    let buf = Bytes::from(&data_to_send[..1024]);
                    writer.start_send(buf).unwrap();
                    writer.poll_complete().unwrap();
                    Ok(())
                })
                .and_then(|()| Ok(()))
                .or_else(|err| Err(err))
                .then(|result| Ok(()));

            tokio::spawn(processor)
        });

    tokio::run(done);
}