从流 "FramedRead" 如何在每个块中 "do something"

from a stream "FramedRead" how to "do something" in every chunk

我想使用 crate indicatif, I am uploading the file asynchronous using reqwest 显示文件的上传进度,如下所示:

use tokio::fs::File;
use tokio_util::codec::{BytesCodec, FramedRead};

let file = File::open(file_path).await?;
let stream = FramedRead::new(file, BytesCodec::new());
let body = Body::wrap_stream(stream);
client.put(url).body(body)

进度条是这样实现的:

use indicatif::ProgressBar;

let bar = ProgressBar::new(1000);
for _ in 0..1000 {
    bar.inc(1);
    // ...
}
bar.finish();

如何从 stream:

let stream = FramedRead::new(file, BytesCodec::new());
// how on every chunk do X ? 
let body = Body::wrap_stream(stream);

我可以在每次互动时调用 bar.inc(1) 吗?

从文档中我看到有一个 read_buffer 但是如何以一种我可以使用它来调用自定义函数或计算我可以显示的 cased 中发送的字节的方式对其进行迭代例如,到目前为止“发送的字节数”。

例如,您可以使用 TryStreamExt::inspect_ok,当该项目被消耗时,它将调用一个引用流中每个 Ok(item) 的闭包。

use futures::stream::TryStreamExt;
use tokio_util::codec::{BytesCodec, FramedRead};

let stream = FramedRead::new(file, BytesCodec::new())
    .inspect_ok(|chunk| {
        // do X with chunk...
    });

let body = Body::wrap_stream(stream);