如何通过futures:stream::Stream发送bytes::bytes::Bytes?
How to send bytes::bytes::Bytes by futures:stream::Stream?
我正在尝试编写基于 Tokio's example 的 TCP 服务器。
当我尝试发送缓冲区时,编译器 returns 错误 0277。
我的代码:(playground)
extern crate tokio; // 0.1.22
use tokio::codec::{BytesCodec, Decoder};
use tokio::net::TcpListener;
use tokio::prelude::*;
use bytes::Bytes; // 0.4.12
fn main() {
let addr = "0.0.0.0:1502".parse().unwrap();
let mut socket = TcpListener::bind(&addr).unwrap();
let done = socket
.incoming()
.map_err(|e| println!("failed to accept socket; error = {:?}", e))
.for_each(move |socket| {
let framed = BytesCodec::new().framed(socket);
let (writer, reader) = framed.split();
let processor = reader
.for_each(move |bytes| {
println!("bytes: {:?}", bytes);
let mut data_to_send = [0 as u8; 1024];
let buf = Bytes::from(&data_to_send[..1024]);
writer.send_all(&mut buf);
Ok(())
})
.and_then(|()| Ok(()))
.or_else(|err| Err(err))
.then(|result| Ok(()));
tokio::spawn(processor)
});
tokio::run(done);
}
error[E0277]: the trait bound `bytes::bytes::Bytes: tokio::prelude::Stream` is not satisfied
--> src/main.rs:25:28
|
25 | writer.send_all(&mut buf); // ERROR : E0277
| ^^^^^^^^ the trait `tokio::prelude::Stream` is not implemented for `bytes::bytes::Bytes`
|
= note: required because of the requirements on the impl of `tokio::prelude::Stream` for `&mut bytes::bytes::Bytes`
我解决了这个问题。
extern crate tokio; // 0.1.22
use tokio::codec::{BytesCodec, Decoder};
use tokio::net::TcpListener;
use tokio::prelude::*;
use bytes::Bytes; // 0.4.12
fn main() {
let addr = "0.0.0.0:1502".parse().unwrap();
let mut socket = TcpListener::bind(&addr).unwrap();
let done = socket
.incoming()
.map_err(|e| println!("failed to accept socket; error = {:?}", e))
.for_each(move |socket| {
let framed = BytesCodec::new().framed(socket);
let (mut writer, reader) = framed.split();
let processor = reader
.for_each(move |bytes| {
println!("bytes: {:?}", bytes);
let mut data_to_send = [0 as u8; 1024];
let buf = Bytes::from(&data_to_send[..1024]);
writer.start_send(buf).unwrap();
writer.poll_complete().unwrap();
Ok(())
})
.and_then(|()| Ok(()))
.or_else(|err| Err(err))
.then(|result| Ok(()));
tokio::spawn(processor)
});
tokio::run(done);
}
我正在尝试编写基于 Tokio's example 的 TCP 服务器。
当我尝试发送缓冲区时,编译器 returns 错误 0277。
我的代码:(playground)
extern crate tokio; // 0.1.22
use tokio::codec::{BytesCodec, Decoder};
use tokio::net::TcpListener;
use tokio::prelude::*;
use bytes::Bytes; // 0.4.12
fn main() {
let addr = "0.0.0.0:1502".parse().unwrap();
let mut socket = TcpListener::bind(&addr).unwrap();
let done = socket
.incoming()
.map_err(|e| println!("failed to accept socket; error = {:?}", e))
.for_each(move |socket| {
let framed = BytesCodec::new().framed(socket);
let (writer, reader) = framed.split();
let processor = reader
.for_each(move |bytes| {
println!("bytes: {:?}", bytes);
let mut data_to_send = [0 as u8; 1024];
let buf = Bytes::from(&data_to_send[..1024]);
writer.send_all(&mut buf);
Ok(())
})
.and_then(|()| Ok(()))
.or_else(|err| Err(err))
.then(|result| Ok(()));
tokio::spawn(processor)
});
tokio::run(done);
}
error[E0277]: the trait bound `bytes::bytes::Bytes: tokio::prelude::Stream` is not satisfied
--> src/main.rs:25:28
|
25 | writer.send_all(&mut buf); // ERROR : E0277
| ^^^^^^^^ the trait `tokio::prelude::Stream` is not implemented for `bytes::bytes::Bytes`
|
= note: required because of the requirements on the impl of `tokio::prelude::Stream` for `&mut bytes::bytes::Bytes`
我解决了这个问题。
extern crate tokio; // 0.1.22
use tokio::codec::{BytesCodec, Decoder};
use tokio::net::TcpListener;
use tokio::prelude::*;
use bytes::Bytes; // 0.4.12
fn main() {
let addr = "0.0.0.0:1502".parse().unwrap();
let mut socket = TcpListener::bind(&addr).unwrap();
let done = socket
.incoming()
.map_err(|e| println!("failed to accept socket; error = {:?}", e))
.for_each(move |socket| {
let framed = BytesCodec::new().framed(socket);
let (mut writer, reader) = framed.split();
let processor = reader
.for_each(move |bytes| {
println!("bytes: {:?}", bytes);
let mut data_to_send = [0 as u8; 1024];
let buf = Bytes::from(&data_to_send[..1024]);
writer.start_send(buf).unwrap();
writer.poll_complete().unwrap();
Ok(())
})
.and_then(|()| Ok(()))
.or_else(|err| Err(err))
.then(|result| Ok(()));
tokio::spawn(processor)
});
tokio::run(done);
}