控制衍生期货的数量以产生背压

Controlling the number of spawned futures to create backpressure

我正在使用 futures-rs powered version of the Rusoto AWS Kinesis library。我需要生成 AWS Kinesis 请求的深层管道以实现高吞吐量,因为 Kinesis 对每个 HTTP 请求有 500 条记录的限制。结合发送请求的 50 毫秒延迟,我需要开始生成许多并发请求。我希望按 100 个飞行请求的顺序创建某个地方。

Rusoto put_records 函数签名如下所示:

fn put_records(
    &self,
    input: &PutRecordsInput,
) -> RusotoFuture<PutRecordsOutput, PutRecordsError>

RusotoFuture 是这样定义的包装器:

/// Future that is returned from all rusoto service APIs.
pub struct RusotoFuture<T, E> {
    inner: Box<Future<Item = T, Error = E> + 'static>,
}

内部 Future 已包装,但 RusutoFuture 仍实现 Future::poll(),因此我相信它与 futures-rs 生态系统兼容。 RusotoFuture提供同步调用:

impl<T, E> RusotoFuture<T, E> {
    /// Blocks the current thread until the future has resolved.
    ///
    /// This is meant to provide a simple way for non-async consumers
    /// to work with rusoto.
    pub fn sync(self) -> Result<T, E> {
        self.wait()
    }
}

我可以发出请求并 sync() 它,从 AWS 获取结果。我想创建许多请求,将它们放在某种 queue/list 中,然后收集完成的请求。如果请求出错,我需要重新发出请求(这在 Kinesis 中有些正常,尤其是在达到分片吞吐量限制时)。如果请求成功完成,我应该发出一个包含新数据的请求。我可以为每个请求生成一个线程并同步它,但是当我有异步 IO 线程 运行.

时,这似乎效率很低

我曾尝试在我的应用程序线程中使用 futures::sync::mpsc::channel(而不是在 Tokio 反应器内部使用 运行),但是每当我克隆 tx 时,它都会生成自己的缓冲区,从而消除任何种类send 上的背压:

fn kinesis_pipeline(client: DefaultKinesisClient, stream_name: String, num_puts: usize, puts_size: usize) {
    use futures::sync::mpsc::{ channel, spawn };
    use futures::{ Sink, Future, Stream };
    use futures::stream::Sender;
    use rusoto_core::reactor::DEFAULT_REACTOR;

    let client = Arc::new(KinesisClient::simple(Region::UsWest2));
    let data = FauxData::new(); // a data generator for testing

    let (mut tx, mut rx) = channel(1);

    for rec in data {
        tx.clone().send(rec);
    }
}

没有克隆,我有错误:

error[E0382]: use of moved value: `tx`
   --> src/main.rs:150:9
    |
150 |         tx.send(rec);
    |         ^^ value moved here in previous iteration of loop
    |
    = note: move occurs because `tx` has type `futures::sync::mpsc::Sender<rusoto_kinesis::PutRecordsRequestEntry>`, which does not implement the `Copy` trait

我也根据建议查看了 futures::mpsc::sync::spawn,但它占用了 rx 的所有权(作为 Stream)并且没有解决我对 [=29 的问题=] 的 tx 导致无限行为。

我希望如果我能让 channel/spawn 用法正常工作,我将有一个需要 RusotoFutures 的系统,等待它们完成,然后为我提供了一种从我的应用程序线程中获取完成结果的简单方法。

据我所知,channel is not that a single clone of the Sender 将容量增加 1 的问题是,您为要发送的每个项目克隆了 Sender

您在没有 clone 的情况下看到的错误来自您对 Sink::send 界面的错误使用。使用 clone 你实际上应该看到警告:

warning: unused `futures::sink::Send` which must be used: futures do nothing unless polled

也就是说:您当前的代码实际上并没有发送任何东西!

为了应用背压,您需要链接那些 send 调用;每一个都应该等到前一个完成(你也需要等待最后一个!);成功后你会得到 Sender 回来。执行此操作的最佳方法是使用 iter_ok and to pass it to send_all.

从迭代器生成 Stream

现在你有一个未来 SendAll 你需要“开车”。如果您忽略结果并在出错时出现恐慌 (.then(|r| { r.unwrap(); Ok::<(), ()>(()) })),您可以将其作为一个单独的任务生成,但也许您想将其集成到您的主应用程序中(即 return 它在 Box).

// this returns a `Box<Future<Item = (), Error = ()>>`. you may
// want to use a different error type
Box::new(tx.send_all(iter_ok(data)).map(|_| ()).map_err(|_| ()))

RusotoFuture::syncFuture::wait

不要使用 Future::wait:它已经在某个分支中被弃用,而且它通常不会做您真正想要的。我怀疑 RusotoFuture 是否意识到这些问题,所以我建议避免使用 RusotoFuture::sync.

克隆Sender增加信道容量

如您正确所述,克隆 Sender 将容量增加一倍。

这似乎是为了提高性能:A Sender 以未阻塞(“unparked”)状态启动;如果 Sender 没有被阻止,它可以发送一个项目而不被阻止。但是,如果队列中的项目数在 Sender 发送项目时达到配置的限制,则 Sender 将被阻止(“停放”)。 (从队列中移除项目将在特定时间解锁 Sender。)

这意味着在内部队列达到限制后,每个 Sender 仍然可以发送一个项目,这会导致容量增加的记录效果,但前提是实际上所有 Sender 都是发送项目 - 未使用的 Senders 不会增加观察到的容量。

性能提升来自于这样一个事实,即只要您没有达到限制,它就不需要停放和通知任务(这非常繁重)。

mpsc 模块顶部的私人文档描述了更多细节。