从 AsyncRead 转发到 futures 0.3 mpsc::UnboundedSender<T> 时的错误处理

Error handling when forwarding from AsyncRead to an futures 0.3 mpsc::UnboundedSender<T>

我想将实现 tokio::io::AsyncReadtokio::process::child::ChildStdout 的输出转发给实现 futures::sink::Sink.

futures::channel::mpsc::UnboundedSender<MyType>

我正在使用生成 MyType 项目的自定义编解码器,但为了忠实于 MRE 中的 M,我将使用 Tokio 的 LinesCodec 并为此说 MyType = String问题。

use futures::StreamExt; // 0.3.8
use tokio; // 1.0.1
use tokio_util; // 0.6.0

#[tokio::main]
pub async fn main() {
    let (mut tx, rx) = futures::channel::mpsc::unbounded::<String>();

    let mut process = tokio::process::Command::new("dmesg")
        .arg("-w")
        .stdout(std::process::Stdio::piped())
        .spawn()
        .unwrap();

    let stdout = process.stdout.take().unwrap();
    let codec = tokio_util::codec::LinesCodec::new();
    let framed_read = tokio_util::codec::FramedRead::new(stdout, codec);

    let forward = framed_read.forward(tx);

    // read from the other end of the channel
    tokio::spawn(async move {
        while let Some(line) = rx.next().await {
            eprintln!("{}", line);
        }
    });

    //forward.await;
}

但是,编译器报告错误类型不匹配:

error[E0271]: type mismatch resolving `<futures::channel::mpsc::UnboundedSender<String> as futures::Sink<String>>::Error == LinesCodecError`
  --> src/main.rs:19:31
   |
19 |     let forward = framed_read.forward(tx);
   |                               ^^^^^^^ expected struct `futures::channel::mpsc::SendError`, found enum `LinesCodecError`

假设我在这里没有做根本性的错误,我怎样才能正确地 handle/convert 这些错误类型?

似乎有一个 但它似乎是针对相反的情况和 futures 0.1 我怀疑它可能已经过时了,因为 Rust 的异步生态系统变化如此之快。

流中的项目可能会失败 (LinesCodecError),将值发送到通道可能会失败 (SendError),但整个转发过程只会导致单一错误类型。

您可以使用SinkExt::sink_err_into and TryStreamExt::err_into将错误转换为兼容的统一类型。在这里,我选择了 Box<dyn Error>:

type Error = Box<dyn std::error::Error>;

let forward = framed_read.err_into::<Error>().forward(tx.sink_err_into::<Error>());

在许多情况下,您会创建自定义错误类型。您也可能不需要像上面的示例那样频繁地使用 turbofish,因为类型推断可能会在某个时候开始。

另请参阅: