从 AsyncRead 转发到 futures 0.3 mpsc::UnboundedSender<T> 时的错误处理
Error handling when forwarding from AsyncRead to an futures 0.3 mpsc::UnboundedSender<T>
我想将实现 tokio::io::AsyncRead
的 tokio::process::child::ChildStdout
的输出转发给实现 futures::sink::Sink
.
的 futures::channel::mpsc::UnboundedSender<MyType>
我正在使用生成 MyType
项目的自定义编解码器,但为了忠实于 MRE 中的 M,我将使用 Tokio 的 LinesCodec
并为此说 MyType = String
问题。
use futures::StreamExt; // 0.3.8
use tokio; // 1.0.1
use tokio_util; // 0.6.0
#[tokio::main]
pub async fn main() {
let (mut tx, rx) = futures::channel::mpsc::unbounded::<String>();
let mut process = tokio::process::Command::new("dmesg")
.arg("-w")
.stdout(std::process::Stdio::piped())
.spawn()
.unwrap();
let stdout = process.stdout.take().unwrap();
let codec = tokio_util::codec::LinesCodec::new();
let framed_read = tokio_util::codec::FramedRead::new(stdout, codec);
let forward = framed_read.forward(tx);
// read from the other end of the channel
tokio::spawn(async move {
while let Some(line) = rx.next().await {
eprintln!("{}", line);
}
});
//forward.await;
}
但是,编译器报告错误类型不匹配:
error[E0271]: type mismatch resolving `<futures::channel::mpsc::UnboundedSender<String> as futures::Sink<String>>::Error == LinesCodecError`
--> src/main.rs:19:31
|
19 | let forward = framed_read.forward(tx);
| ^^^^^^^ expected struct `futures::channel::mpsc::SendError`, found enum `LinesCodecError`
假设我在这里没有做根本性的错误,我怎样才能正确地 handle/convert 这些错误类型?
似乎有一个 但它似乎是针对相反的情况和 futures 0.1 我怀疑它可能已经过时了,因为 Rust 的异步生态系统变化如此之快。
流中的项目可能会失败 (LinesCodecError
),将值发送到通道可能会失败 (SendError
),但整个转发过程只会导致单一错误类型。
您可以使用SinkExt::sink_err_into
and TryStreamExt::err_into
将错误转换为兼容的统一类型。在这里,我选择了 Box<dyn Error>
:
type Error = Box<dyn std::error::Error>;
let forward = framed_read.err_into::<Error>().forward(tx.sink_err_into::<Error>());
在许多情况下,您会创建自定义错误类型。您也可能不需要像上面的示例那样频繁地使用 turbofish,因为类型推断可能会在某个时候开始。
另请参阅:
我想将实现 tokio::io::AsyncRead
的 tokio::process::child::ChildStdout
的输出转发给实现 futures::sink::Sink
.
futures::channel::mpsc::UnboundedSender<MyType>
我正在使用生成 MyType
项目的自定义编解码器,但为了忠实于 MRE 中的 M,我将使用 Tokio 的 LinesCodec
并为此说 MyType = String
问题。
use futures::StreamExt; // 0.3.8
use tokio; // 1.0.1
use tokio_util; // 0.6.0
#[tokio::main]
pub async fn main() {
let (mut tx, rx) = futures::channel::mpsc::unbounded::<String>();
let mut process = tokio::process::Command::new("dmesg")
.arg("-w")
.stdout(std::process::Stdio::piped())
.spawn()
.unwrap();
let stdout = process.stdout.take().unwrap();
let codec = tokio_util::codec::LinesCodec::new();
let framed_read = tokio_util::codec::FramedRead::new(stdout, codec);
let forward = framed_read.forward(tx);
// read from the other end of the channel
tokio::spawn(async move {
while let Some(line) = rx.next().await {
eprintln!("{}", line);
}
});
//forward.await;
}
但是,编译器报告错误类型不匹配:
error[E0271]: type mismatch resolving `<futures::channel::mpsc::UnboundedSender<String> as futures::Sink<String>>::Error == LinesCodecError`
--> src/main.rs:19:31
|
19 | let forward = framed_read.forward(tx);
| ^^^^^^^ expected struct `futures::channel::mpsc::SendError`, found enum `LinesCodecError`
假设我在这里没有做根本性的错误,我怎样才能正确地 handle/convert 这些错误类型?
似乎有一个
流中的项目可能会失败 (LinesCodecError
),将值发送到通道可能会失败 (SendError
),但整个转发过程只会导致单一错误类型。
您可以使用SinkExt::sink_err_into
and TryStreamExt::err_into
将错误转换为兼容的统一类型。在这里,我选择了 Box<dyn Error>
:
type Error = Box<dyn std::error::Error>;
let forward = framed_read.err_into::<Error>().forward(tx.sink_err_into::<Error>());
在许多情况下,您会创建自定义错误类型。您也可能不需要像上面的示例那样频繁地使用 turbofish,因为类型推断可能会在某个时候开始。
另请参阅: