推 SelectAll Stream
push on SelectAll Stream
我想要一个用于处理某些流的结构。所有流共享同一个项目。逻辑如下:我需要创建一个唯一的流,其中包含来自所有其他流的所有项目。我还需要将“新”流添加到“主”流。我不关心下一个项目来自哪个流。
为此,我看到 select_all
函数应该执行上述逻辑。
pub struct WsPool {
merge: Arc<Mutex<SelectAll<Box<dyn Stream<Item=MyItem> + Send + 'static>>>>,
}
impl WsPool {
pub fn new() -> Self {
Self {
merge: Arc::new(Mutex::new(SelectAll::new())),
}
}
pub fn add(&self, s: Box<dyn Stream<Item = MyItem> + Send + 'static>) {
let mut merge = self.merge.lock().unwrap();
merge.push(s);
}
pub async fn process(&self) {
loop {
let mut merge = self.merge.lock().unwrap();
let item = merge.await.next();
}
}
}
但我收到这些错误:
error[E0277]: `std::sync::MutexGuard<'_, futures::stream::SelectAll<Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>>>` is not a future
--> src/ws_pool.rs:30:24
|
30 | let item = merge.await.next();
| ^^^^^^^^^^^ `std::sync::MutexGuard<'_, futures::stream::SelectAll<Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>>>` is not a future
|
= help: the trait `futures::Future` is not implemented for `std::sync::MutexGuard<'_, futures::stream::SelectAll<Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>>>`
note: required by `futures::Future::poll`
--> /home/allevo/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/future/future.rs:99:5
|
99 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
error[E0277]: `(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)` cannot be unpinned
--> src/ws_pool.rs:17:40
|
17 | merge: Arc::new(Mutex::new(SelectAll::new())),
| ^^^^^^^^^^^^^^^^ the trait `Unpin` is not implemented for `(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)`
|
= note: consider using `Box::pin`
= note: required because of the requirements on the impl of `futures::Stream` for `Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>`
note: required by `futures::stream::SelectAll::<St>::new`
--> /home/allevo/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.17/src/stream/select_all.rs:47:5
|
47 | pub fn new() -> Self {
| ^^^^^^^^^^^^^^^^^^^^
error[E0599]: the method `push` exists for struct `std::sync::MutexGuard<'_, futures::stream::SelectAll<Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>>>`, but its trait bounds were not satisfied
--> src/ws_pool.rs:24:15
|
24 | merge.push(s);
| ^^^^ method cannot be called on `std::sync::MutexGuard<'_, futures::stream::SelectAll<Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>>>` due to unsatisfied trait bounds
|
::: /home/allevo/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/alloc/src/boxed.rs:172:1
|
172 | / pub struct Box<
173 | | T: ?Sized,
174 | | #[unstable(feature = "allocator_api", issue = "32838")] A: Allocator = Global,
175 | | >(Unique<T>, A);
| |________________- doesn't satisfy `_: futures::Stream`
|
= note: the following trait bounds were not satisfied:
`Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>: futures::Stream`
我做错了什么?否则我如何存储多个流并迭代它们?
你的第一个问题是一个简单的混淆。您想要 await
下一个 值,而不是流本身:
let item = merge.next().await;
产生的错误都是因为SelectAll<Box<dyn Stream + Send + 'static>>
没有实现Stream
。如果您查看 impl Stream for SelectAll
, it is constrained that the inner stream type implements Unpin
.
您可以通过将其添加到边界来解决此问题:
use std::marker::Unpin;
// vvvvv
Arc<Mutex<SelectAll<Box<dyn Stream<Item=MyItem> + Unpin + Send + 'static>>>>
或者更好的解决方案是固定流:
use std::pin::Pin;
// vvv
Arc<Mutex<SelectAll<Pin<Box<dyn Stream<Item=MyItem> + Send + 'static>>>>>
不同的是后者可以接受更多Stream
类型。您只需在添加它们时使用 Box::pin
。
查看它在 Playground 上的工作情况。
我想要一个用于处理某些流的结构。所有流共享同一个项目。逻辑如下:我需要创建一个唯一的流,其中包含来自所有其他流的所有项目。我还需要将“新”流添加到“主”流。我不关心下一个项目来自哪个流。
为此,我看到 select_all
函数应该执行上述逻辑。
pub struct WsPool {
merge: Arc<Mutex<SelectAll<Box<dyn Stream<Item=MyItem> + Send + 'static>>>>,
}
impl WsPool {
pub fn new() -> Self {
Self {
merge: Arc::new(Mutex::new(SelectAll::new())),
}
}
pub fn add(&self, s: Box<dyn Stream<Item = MyItem> + Send + 'static>) {
let mut merge = self.merge.lock().unwrap();
merge.push(s);
}
pub async fn process(&self) {
loop {
let mut merge = self.merge.lock().unwrap();
let item = merge.await.next();
}
}
}
但我收到这些错误:
error[E0277]: `std::sync::MutexGuard<'_, futures::stream::SelectAll<Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>>>` is not a future
--> src/ws_pool.rs:30:24
|
30 | let item = merge.await.next();
| ^^^^^^^^^^^ `std::sync::MutexGuard<'_, futures::stream::SelectAll<Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>>>` is not a future
|
= help: the trait `futures::Future` is not implemented for `std::sync::MutexGuard<'_, futures::stream::SelectAll<Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>>>`
note: required by `futures::Future::poll`
--> /home/allevo/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/future/future.rs:99:5
|
99 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
error[E0277]: `(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)` cannot be unpinned
--> src/ws_pool.rs:17:40
|
17 | merge: Arc::new(Mutex::new(SelectAll::new())),
| ^^^^^^^^^^^^^^^^ the trait `Unpin` is not implemented for `(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)`
|
= note: consider using `Box::pin`
= note: required because of the requirements on the impl of `futures::Stream` for `Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>`
note: required by `futures::stream::SelectAll::<St>::new`
--> /home/allevo/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.17/src/stream/select_all.rs:47:5
|
47 | pub fn new() -> Self {
| ^^^^^^^^^^^^^^^^^^^^
error[E0599]: the method `push` exists for struct `std::sync::MutexGuard<'_, futures::stream::SelectAll<Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>>>`, but its trait bounds were not satisfied
--> src/ws_pool.rs:24:15
|
24 | merge.push(s);
| ^^^^ method cannot be called on `std::sync::MutexGuard<'_, futures::stream::SelectAll<Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>>>` due to unsatisfied trait bounds
|
::: /home/allevo/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/alloc/src/boxed.rs:172:1
|
172 | / pub struct Box<
173 | | T: ?Sized,
174 | | #[unstable(feature = "allocator_api", issue = "32838")] A: Allocator = Global,
175 | | >(Unique<T>, A);
| |________________- doesn't satisfy `_: futures::Stream`
|
= note: the following trait bounds were not satisfied:
`Box<(dyn futures::Stream<Item = MyItem> + std::marker::Send + 'static)>: futures::Stream`
我做错了什么?否则我如何存储多个流并迭代它们?
你的第一个问题是一个简单的混淆。您想要 await
下一个 值,而不是流本身:
let item = merge.next().await;
产生的错误都是因为SelectAll<Box<dyn Stream + Send + 'static>>
没有实现Stream
。如果您查看 impl Stream for SelectAll
, it is constrained that the inner stream type implements Unpin
.
您可以通过将其添加到边界来解决此问题:
use std::marker::Unpin;
// vvvvv
Arc<Mutex<SelectAll<Box<dyn Stream<Item=MyItem> + Unpin + Send + 'static>>>>
或者更好的解决方案是固定流:
use std::pin::Pin;
// vvv
Arc<Mutex<SelectAll<Pin<Box<dyn Stream<Item=MyItem> + Send + 'static>>>>>
不同的是后者可以接受更多Stream
类型。您只需在添加它们时使用 Box::pin
。
查看它在 Playground 上的工作情况。