Tokio FramedRead.for_each 无限期调用单一响应
Tokio FramedRead.for_each called indefinitely for single response
为了使用 tokio_uds 编写协议,我已经研究了 tokio 几个星期。以下代码存在几个问题:
framed.for_each
从单个响应中一遍又一遍地调用。
套接字仅发送 1 条真实消息,但 Decoder
会尽可能多地解码完全相同的事件,直到填满有界通道。
通道上没有收到任何东西(rx.for_each
从不打印任何东西),尽管它似乎一直在写入直到填满。
我需要使用 UnixStream 而不是 UnixListener,因为我必须先通过套接字将一些数据 'subscribe' 传送到服务并让它知道要发送什么。
use byteorder::{ByteOrder, LittleEndian};
use bytes::{Buf, BufMut, Bytes, BytesMut, IntoBuf};
use futures::prelude::*;
use futures::sync::mpsc::{self, Receiver, Sender};
use futures::Stream;
use tokio::prelude::*;
use tokio_codec::{Decoder, Encoder, FramedRead};
use tokio_uds::UnixStream;
fn subscribe(tx: Sender<event::Evt>, events: Vec<Event>) -> io::Result<()> {
let fut = UnixStream::connect(socket_path()?)
.and_then(move |stream| {
// some setup
tokio::io::write_all(stream, buf)
})
.and_then(|(stream, _buf)| {
let buf = [0_u8; 30]; // <i3-ipc (6 bytes)><len (4 bytes)><type (4 bytes)><{success:true} 16 bytes>
tokio::io::read_exact(stream, buf)
})
.and_then(|(stream, initial)| {
if &initial[0..6] != MAGIC.as_bytes() {
panic!("Magic str not received");
}
// decoding initial response and returning stream
future::ok(stream)
})
.and_then(move |stream| {
let framed = FramedRead::new(stream, EvtCodec);
let sender = framed
.for_each(move |evt| {
let tx = tx.clone();
tx.send(evt).wait(); // this line is called continuously until buffer fills
Ok(())
})
.map_err(|err| println!("{}", err));
tokio::spawn(sender);
Ok(())
})
.map(|_| ())
.map_err(|e| eprintln!("{:?}", e));
tokio::run(fut);
Ok(())
}
fn test_sub() -> io::Result<()> {
let (tx, rx) = mpsc::channel(5);
subscribe(tx, vec![Event::Window])?;
let fut = rx.for_each(|e: event::Evt| {
println!("received"); // never reaches
future::ok(())
});
tokio::spawn(fut);
Ok(())
}
我的Decoder
:
pub struct EvtCodec;
/// decoding: "<i3-ipc><payload len: u32><msg type: u32><payload>"
impl Decoder for EvtCodec {
type Item = event::Evt;
type Error = io::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, io::Error> {
if src.len() > 14 {
if &src[0..6] != MAGIC.as_bytes() {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("Expected 'i3-ipc' but received: {:?}", &src[0..6]),
));
}
let payload_len = LittleEndian::read_u32(&src[6..10]) as usize;
let evt_type = LittleEndian::read_u32(&src[10..14]);
dbg!(&src.len()); // 878
dbg!(payload_len); // 864
if src.len() < 14 + payload_len {
Ok(None)
} else {
let evt = decode_evt(evt_type, src[14..].as_mut().to_vec())?;
dbg!(&evt); // correctly prints out a well-formed event
Ok(Some(evt))
}
} else {
Ok(None)
}
}
}
我看到您解决了其他问题,我很想知道您是如何解决这个问题的。以下是我在我的 TCP Tokio 副项目中修复它的方法:
use byteorder::{ByteOrder, LittleEndian};
use bytes::{Buf, BufMut, Bytes, BytesMut, IntoBuf};
use futures::prelude::*;
use futures::sync::mpsc::{self, Receiver, Sender};
use futures::Stream;
use tokio::prelude::*;
use tokio_codec::{Decoder, Encoder, FramedRead};
use tokio_uds::UnixStream;
fn subscribe(tx: Sender<event::Evt>, rx: Receiver<event::Evt>, events: Vec<Event>) -> io::Result<()> {
let fut = UnixStream::connect(socket_path()?)
.and_then(move |stream| {
// some setup
tokio::io::write_all(stream, buf)
})
.and_then(|(stream, _buf)| {
let buf = [0_u8; 30]; // <i3-ipc (6 bytes)><len (4 bytes)><type (4 bytes)><{success:true} 16 bytes>
tokio::io::read_exact(stream, buf)
})
.and_then(|(stream, initial)| {
if &initial[0..6] != MAGIC.as_bytes() {
panic!("Magic str not received");
}
// decoding initial response and returning stream
future::ok(stream)
})
.and_then(move |stream| {
let framed = FramedRead::new(stream, EvtCodec);
let (writer, reader) = framed.split();
// Connect your framed reader to the channel
let sink = rx.forward(writer.sink_map_err(|_| ()));
tokio::spawn(sink.map(|_| ()));
let sender = reader
.for_each(move |evt| {
let tx = tx.clone();
tx.send(evt).wait(); // this line is called continuously until buffer fills
Ok(())
})
.map_err(|err| println!("{}", err));
tokio::spawn(sender);
Ok(())
})
.map(|_| ())
.map_err(|e| eprintln!("{:?}", e));
tokio::run(fut);
Ok(())
}
fn test_sub() -> io::Result<()> {
let (tx, rx) = mpsc::channel(5);
subscribe(tx, rx, vec![Event::Window])?;
let fut = rx.for_each(|e: event::Evt| {
println!("received"); // never reaches
future::ok(())
});
tokio::spawn(fut);
Ok(())
}
并且缓冲区清除的解码器:
pub struct EvtCodec;
/// decoding: "<i3-ipc><payload len: u32><msg type: u32><payload>"
impl Decoder for EvtCodec {
type Item = event::Evt;
type Error = io::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, io::Error> {
if src.len() > 14 {
if &src[0..6] != MAGIC.as_bytes() {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("Expected 'i3-ipc' but received: {:?}", &src[0..6]),
));
}
let payload_len = LittleEndian::read_u32(&src[6..10]) as usize;
let evt_type = LittleEndian::read_u32(&src[10..14]);
dbg!(&src.len()); // 878
dbg!(payload_len); // 864
if src.len() < 14 + payload_len {
Ok(None)
} else {
let evt = decode_evt(evt_type, src[14..].as_mut().to_vec())?;
dbg!(&evt); // correctly prints out a well-formed event
src.clear(); // Clears the buffer, so you don't have to keep decoding the same packet over and over.
Ok(Some(evt))
}
} else {
Ok(None)
}
}
}
希望对您有所帮助!
编辑:
根据 rust subreddit 上的一位用户在我将此解决方案包含在博客 post 后发表的评论,src.clear()
对我来说可能是错误的答案。我应该改用 `src.advance(14+payload_len)
链接reddit评论here
为了使用 tokio_uds 编写协议,我已经研究了 tokio 几个星期。以下代码存在几个问题:
framed.for_each
从单个响应中一遍又一遍地调用。
套接字仅发送 1 条真实消息,但 Decoder
会尽可能多地解码完全相同的事件,直到填满有界通道。
通道上没有收到任何东西(rx.for_each
从不打印任何东西),尽管它似乎一直在写入直到填满。
我需要使用 UnixStream 而不是 UnixListener,因为我必须先通过套接字将一些数据 'subscribe' 传送到服务并让它知道要发送什么。
use byteorder::{ByteOrder, LittleEndian};
use bytes::{Buf, BufMut, Bytes, BytesMut, IntoBuf};
use futures::prelude::*;
use futures::sync::mpsc::{self, Receiver, Sender};
use futures::Stream;
use tokio::prelude::*;
use tokio_codec::{Decoder, Encoder, FramedRead};
use tokio_uds::UnixStream;
fn subscribe(tx: Sender<event::Evt>, events: Vec<Event>) -> io::Result<()> {
let fut = UnixStream::connect(socket_path()?)
.and_then(move |stream| {
// some setup
tokio::io::write_all(stream, buf)
})
.and_then(|(stream, _buf)| {
let buf = [0_u8; 30]; // <i3-ipc (6 bytes)><len (4 bytes)><type (4 bytes)><{success:true} 16 bytes>
tokio::io::read_exact(stream, buf)
})
.and_then(|(stream, initial)| {
if &initial[0..6] != MAGIC.as_bytes() {
panic!("Magic str not received");
}
// decoding initial response and returning stream
future::ok(stream)
})
.and_then(move |stream| {
let framed = FramedRead::new(stream, EvtCodec);
let sender = framed
.for_each(move |evt| {
let tx = tx.clone();
tx.send(evt).wait(); // this line is called continuously until buffer fills
Ok(())
})
.map_err(|err| println!("{}", err));
tokio::spawn(sender);
Ok(())
})
.map(|_| ())
.map_err(|e| eprintln!("{:?}", e));
tokio::run(fut);
Ok(())
}
fn test_sub() -> io::Result<()> {
let (tx, rx) = mpsc::channel(5);
subscribe(tx, vec![Event::Window])?;
let fut = rx.for_each(|e: event::Evt| {
println!("received"); // never reaches
future::ok(())
});
tokio::spawn(fut);
Ok(())
}
我的Decoder
:
pub struct EvtCodec;
/// decoding: "<i3-ipc><payload len: u32><msg type: u32><payload>"
impl Decoder for EvtCodec {
type Item = event::Evt;
type Error = io::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, io::Error> {
if src.len() > 14 {
if &src[0..6] != MAGIC.as_bytes() {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("Expected 'i3-ipc' but received: {:?}", &src[0..6]),
));
}
let payload_len = LittleEndian::read_u32(&src[6..10]) as usize;
let evt_type = LittleEndian::read_u32(&src[10..14]);
dbg!(&src.len()); // 878
dbg!(payload_len); // 864
if src.len() < 14 + payload_len {
Ok(None)
} else {
let evt = decode_evt(evt_type, src[14..].as_mut().to_vec())?;
dbg!(&evt); // correctly prints out a well-formed event
Ok(Some(evt))
}
} else {
Ok(None)
}
}
}
我看到您解决了其他问题,我很想知道您是如何解决这个问题的。以下是我在我的 TCP Tokio 副项目中修复它的方法:
use byteorder::{ByteOrder, LittleEndian};
use bytes::{Buf, BufMut, Bytes, BytesMut, IntoBuf};
use futures::prelude::*;
use futures::sync::mpsc::{self, Receiver, Sender};
use futures::Stream;
use tokio::prelude::*;
use tokio_codec::{Decoder, Encoder, FramedRead};
use tokio_uds::UnixStream;
fn subscribe(tx: Sender<event::Evt>, rx: Receiver<event::Evt>, events: Vec<Event>) -> io::Result<()> {
let fut = UnixStream::connect(socket_path()?)
.and_then(move |stream| {
// some setup
tokio::io::write_all(stream, buf)
})
.and_then(|(stream, _buf)| {
let buf = [0_u8; 30]; // <i3-ipc (6 bytes)><len (4 bytes)><type (4 bytes)><{success:true} 16 bytes>
tokio::io::read_exact(stream, buf)
})
.and_then(|(stream, initial)| {
if &initial[0..6] != MAGIC.as_bytes() {
panic!("Magic str not received");
}
// decoding initial response and returning stream
future::ok(stream)
})
.and_then(move |stream| {
let framed = FramedRead::new(stream, EvtCodec);
let (writer, reader) = framed.split();
// Connect your framed reader to the channel
let sink = rx.forward(writer.sink_map_err(|_| ()));
tokio::spawn(sink.map(|_| ()));
let sender = reader
.for_each(move |evt| {
let tx = tx.clone();
tx.send(evt).wait(); // this line is called continuously until buffer fills
Ok(())
})
.map_err(|err| println!("{}", err));
tokio::spawn(sender);
Ok(())
})
.map(|_| ())
.map_err(|e| eprintln!("{:?}", e));
tokio::run(fut);
Ok(())
}
fn test_sub() -> io::Result<()> {
let (tx, rx) = mpsc::channel(5);
subscribe(tx, rx, vec![Event::Window])?;
let fut = rx.for_each(|e: event::Evt| {
println!("received"); // never reaches
future::ok(())
});
tokio::spawn(fut);
Ok(())
}
并且缓冲区清除的解码器:
pub struct EvtCodec;
/// decoding: "<i3-ipc><payload len: u32><msg type: u32><payload>"
impl Decoder for EvtCodec {
type Item = event::Evt;
type Error = io::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, io::Error> {
if src.len() > 14 {
if &src[0..6] != MAGIC.as_bytes() {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("Expected 'i3-ipc' but received: {:?}", &src[0..6]),
));
}
let payload_len = LittleEndian::read_u32(&src[6..10]) as usize;
let evt_type = LittleEndian::read_u32(&src[10..14]);
dbg!(&src.len()); // 878
dbg!(payload_len); // 864
if src.len() < 14 + payload_len {
Ok(None)
} else {
let evt = decode_evt(evt_type, src[14..].as_mut().to_vec())?;
dbg!(&evt); // correctly prints out a well-formed event
src.clear(); // Clears the buffer, so you don't have to keep decoding the same packet over and over.
Ok(Some(evt))
}
} else {
Ok(None)
}
}
}
希望对您有所帮助!
编辑:
根据 rust subreddit 上的一位用户在我将此解决方案包含在博客 post 后发表的评论,src.clear()
对我来说可能是错误的答案。我应该改用 `src.advance(14+payload_len)
链接reddit评论here