我如何在 Rust 中使用 `flatmap` 流?
How can I `flatmap` streams in Rust?
我有一个 rusoto_core::ByteStream
which implements futures' Stream
trait:
let chunks = vec![b"1234".to_vec(), b"5678".to_vec()];
let stream = ByteStream::new(stream::iter_ok(chunks));
我想将它传递给 actix_web's HttpResponseBuilder::streaming
方法。
use actix_web::dev::HttpResponseBuilder; // 0.7.18
use rusoto_core::ByteStream; // 0.36.0
fn example(stream: ByteStream, builder: HttpResponseBuilder) {
builder.streaming(stream);
}
当我尝试这样做时,我收到以下错误:
error[E0271]: type mismatch resolving `<rusoto_core::stream::ByteStream as futures::stream::Stream>::Item == bytes::bytes::Bytes`
--> src/main.rs:5:13
|
5 | builder.streaming(stream);
| ^^^^^^^^^ expected struct `std::vec::Vec`, found struct `bytes::bytes::Bytes`
|
= note: expected type `std::vec::Vec<u8>`
found type `bytes::bytes::Bytes`
我认为原因是 streaming()
需要 S: Stream<Item = Bytes, Error>
(即 Item = Bytes
),但我的 ByteStream
有 Item = Vec<u8>
。我该如何解决?
我认为解决方案是 flatmap
我的 ByteStream
不知何故,但我找不到这样的流方法。
下面是如何使用 streaming()
的示例:
let text = "123";
let (tx, rx_body) = mpsc::unbounded();
let _ = tx.unbounded_send(Bytes::from(text.as_bytes()));
HttpResponse::Ok()
.streaming(rx_body.map_err(|e| error::ErrorBadRequest("bad request")))
How can I flatmap
streams in Rust?
平面映射将迭代器的迭代器转换为单个迭代器(或流而不是迭代器)。
期货 0.3
Futures 0.3 没有直接平面图,但是有 StreamExt::flatten
, which can be used after a StreamExt::map
.
use futures::{stream, Stream, StreamExt}; // 0.3.1
fn into_many(i: i32) -> impl Stream<Item = i32> {
stream::iter(0..i)
}
fn nested() -> impl Stream<Item = i32> {
let stream_of_number = into_many(5);
let stream_of_stream_of_number = stream_of_number.map(into_many);
let flat_stream_of_number = stream_of_stream_of_number.flatten();
// Returns: 0, 0, 1, 0, 1, 2, 0, 1, 2, 3
flat_stream_of_number
}
期货 0.1
Futures 0.1 没有直接平面图,但是有Stream::flatten
, which can be used after a Stream::map
.
use futures::{stream, Stream}; // 0.1.25
fn into_many(i: i32) -> impl Stream<Item = i32, Error = ()> {
stream::iter_ok(0..i)
}
fn nested() -> impl Stream<Item = i32, Error = ()> {
let stream_of_number = into_many(5);
let stream_of_stream_of_number = stream_of_number.map(into_many);
let flat_stream_of_number = stream_of_stream_of_number.flatten();
// Returns: 0, 0, 1, 0, 1, 2, 0, 1, 2, 3
flat_stream_of_number
}
但是,这并不能解决您的问题。
streaming()
expects a S: Stream<Item = Bytes, Error>
(i.e., Item = Bytes
) but my ByteStream
has Item = Vec<u8>
是的,这就是问题所在。使用 Bytes::from
via Stream::map
将流 Item
从一种类型转换为另一种类型:
use bytes::Bytes; // 0.4.11
use futures::Stream; // 0.1.25
fn example(stream: ByteStream, mut builder: HttpResponseBuilder) {
builder.streaming(stream.map(Bytes::from));
}
我有一个 rusoto_core::ByteStream
which implements futures' Stream
trait:
let chunks = vec![b"1234".to_vec(), b"5678".to_vec()];
let stream = ByteStream::new(stream::iter_ok(chunks));
我想将它传递给 actix_web's HttpResponseBuilder::streaming
方法。
use actix_web::dev::HttpResponseBuilder; // 0.7.18
use rusoto_core::ByteStream; // 0.36.0
fn example(stream: ByteStream, builder: HttpResponseBuilder) {
builder.streaming(stream);
}
当我尝试这样做时,我收到以下错误:
error[E0271]: type mismatch resolving `<rusoto_core::stream::ByteStream as futures::stream::Stream>::Item == bytes::bytes::Bytes`
--> src/main.rs:5:13
|
5 | builder.streaming(stream);
| ^^^^^^^^^ expected struct `std::vec::Vec`, found struct `bytes::bytes::Bytes`
|
= note: expected type `std::vec::Vec<u8>`
found type `bytes::bytes::Bytes`
我认为原因是 streaming()
需要 S: Stream<Item = Bytes, Error>
(即 Item = Bytes
),但我的 ByteStream
有 Item = Vec<u8>
。我该如何解决?
我认为解决方案是 flatmap
我的 ByteStream
不知何故,但我找不到这样的流方法。
下面是如何使用 streaming()
的示例:
let text = "123";
let (tx, rx_body) = mpsc::unbounded();
let _ = tx.unbounded_send(Bytes::from(text.as_bytes()));
HttpResponse::Ok()
.streaming(rx_body.map_err(|e| error::ErrorBadRequest("bad request")))
How can I
flatmap
streams in Rust?
平面映射将迭代器的迭代器转换为单个迭代器(或流而不是迭代器)。
期货 0.3
Futures 0.3 没有直接平面图,但是有 StreamExt::flatten
, which can be used after a StreamExt::map
.
use futures::{stream, Stream, StreamExt}; // 0.3.1
fn into_many(i: i32) -> impl Stream<Item = i32> {
stream::iter(0..i)
}
fn nested() -> impl Stream<Item = i32> {
let stream_of_number = into_many(5);
let stream_of_stream_of_number = stream_of_number.map(into_many);
let flat_stream_of_number = stream_of_stream_of_number.flatten();
// Returns: 0, 0, 1, 0, 1, 2, 0, 1, 2, 3
flat_stream_of_number
}
期货 0.1
Futures 0.1 没有直接平面图,但是有Stream::flatten
, which can be used after a Stream::map
.
use futures::{stream, Stream}; // 0.1.25
fn into_many(i: i32) -> impl Stream<Item = i32, Error = ()> {
stream::iter_ok(0..i)
}
fn nested() -> impl Stream<Item = i32, Error = ()> {
let stream_of_number = into_many(5);
let stream_of_stream_of_number = stream_of_number.map(into_many);
let flat_stream_of_number = stream_of_stream_of_number.flatten();
// Returns: 0, 0, 1, 0, 1, 2, 0, 1, 2, 3
flat_stream_of_number
}
但是,这并不能解决您的问题。
streaming()
expects aS: Stream<Item = Bytes, Error>
(i.e.,Item = Bytes
) but myByteStream
hasItem = Vec<u8>
是的,这就是问题所在。使用 Bytes::from
via Stream::map
将流 Item
从一种类型转换为另一种类型:
use bytes::Bytes; // 0.4.11
use futures::Stream; // 0.1.25
fn example(stream: ByteStream, mut builder: HttpResponseBuilder) {
builder.streaming(stream.map(Bytes::from));
}