Rust 异步程序中的性能和内存问题
Performance and memory problems in rust async program
我想使用异步客户端对从 Rust 到特定服务的请求进行基准测试,并为此创建了异步基准测试程序。
此函数应运行指定持续时间的并发线程数(实际上是未来的并行链)并报告已实现的迭代计数。
use futures::future;
use futures::prelude::*;
use std::error::Error;
use std::time::{Duration, Instant};
use std::{cell, io, rc};
use tokio::runtime::current_thread::Runtime;
use tokio::timer;
struct Config {
workers: u32,
duration: Duration,
}
/// Build infinitely repeating future
fn cycle<'a, F: Fn() -> P + 'a, P: Future + 'a>(
f: F,
) -> Box<dyn Future<Item = (), Error = P::Error> + 'a> {
Box::new(f().and_then(move |_| cycle(f)))
}
fn benchmark<'a, F: Fn() -> P + 'a, P: Future<Error = io::Error> + 'a>(
config: Config,
f: F,
) -> impl Future<Item = u32, Error = io::Error> + 'a {
let counter = rc::Rc::new(cell::Cell::new(0u32));
let f = rc::Rc::new(f);
future::select_all((0..config.workers).map({
let counter = rc::Rc::clone(&counter);
move |_| {
let counter = rc::Rc::clone(&counter);
let f = rc::Rc::clone(&f);
cycle(move || {
let counter = rc::Rc::clone(&counter);
f().map(move |_| {
counter.set(counter.get() + 1);
})
})
}
}))
.map(|((), _, _)| ())
.map_err(|(err, _, _)| err)
.select(
timer::Delay::new(Instant::now() + config.duration)
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.description())),
)
.map(move |((), _)| counter.get())
.map_err(|(err, _)| err)
}
fn main() {
let duration = std::env::args()
.skip(1)
.next()
.expect("Please provide duration in seconds")
.parse()
.expect("Duration must be integer number");
let ms = Duration::from_millis(1);
let mut rt = Runtime::new().expect("Could not create runtime");
loop {
let iters = rt
.block_on(
benchmark(
Config {
workers: 65536,
duration: Duration::from_secs(duration),
},
|| {
/// Substitute actual benchmarked call
timer::Delay::new(Instant::now() + ms)
.map_err(|err| panic!("Failed to set delay: {:?}", err))
},
)
.map_err(|err| panic!("Benchamrking error: {:?}", err)),
)
.expect("Runtime error");
println!("{} iters/sec", iters as u64 / duration);
}
}
但是,此基准测试报告的结果和内存消耗会随着基准测试持续时间的增加而降低,例如在我的电脑上:
cargo run --release 1
~ 900k iterations/sec
cargo run --release 2
~ 700k iterations/sec
cargo run --release 10
~ 330k iterations/sec
此外,内存使用量随着基准函数 运行s 的增加而迅速增长。我尝试使用 valgrind
来查找内存泄漏,但它只报告仍然可以访问所有分配的内存。
我该如何解决这个问题?
看起来cycle
返回的Box
直到benchmark
结束才被释放,内存分配/释放需要越来越多的时间。
我用 async_await
重写了你的程序,没有 Box
,现在结果是一致的:
#![feature(async_await)]
use futures::{compat::Future01CompatExt, future, prelude::*, select};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use tokio::timer;
struct Config {
workers: u32,
duration: Duration,
}
// Build infinitely repeating future
async fn cycle<'a, F: Fn() -> P + 'a, P: Future<Output = ()> + 'a>(f: F) {
loop {
f().await;
}
}
async fn benchmark<'a, F: Fn() -> P + 'a, P: Future<Output = ()> + 'a>(
config: Config,
f: F,
) -> usize {
let counter = AtomicUsize::new(0);
let infinite_counter = future::select_all((0..config.workers).map(|_| {
cycle(|| {
f().map(|_| {
counter.fetch_add(1, Ordering::SeqCst);
})
})
.boxed_local()
}));
let timer = timer::Delay::new(Instant::now() + config.duration)
.compat()
.unwrap_or_else(|_| panic!("Boom !"));
select! {
a = infinite_counter.fuse() => (),
b = timer.fuse() => (),
};
counter.load(Ordering::SeqCst)
}
fn main() {
let duration = std::env::args()
.skip(1)
.next()
.expect("Please provide duration in seconds")
.parse()
.expect("Duration must be integer number");
let ms = Duration::from_millis(1);
// Use actix_rt runtime instead of vanilla tokio because I want
// to restrict to one OS thread and avoid needing async primitives
let mut rt = actix_rt::Runtime::new().expect("Could not create runtime");;
loop {
let iters = rt
.block_on(
benchmark(
Config {
workers: 65536,
duration: Duration::from_secs(duration),
},
|| {
// Substitute actual benchmarked call
timer::Delay::new(Instant::now() + ms)
.compat()
.unwrap_or_else(|_| panic!("Boom !"))
},
)
.boxed_local()
.unit_error()
.compat(),
)
.expect("Runtime error");
println!("{} iters/sec", iters as u64 / duration);
}
}
这是我第一次使用 futures 0.3,所以我并没有真正了解 select!
语法或 boxed_local
等部分,但它确实有效!
编辑:这是来自 Cargo.toml
的依赖块
[dependencies]
futures-preview = { version = "0.3.0-alpha", features = ["nightly", "compat", "async-await"] }
tokio = "0.1.22"
actix-rt = "0.2.3"
所以事实证明 cycle
确实是罪魁祸首 . I found this useful function in futures crate: loop_fn,并使用它重写了 cycle
:
/// Build infinitely repeating future
fn cycle<'a, F: Fn() -> P + 'a, P: Future + 'a>(
f: F,
) -> impl Future<Item = (), Error = P::Error> + 'a {
future::loop_fn((), move |_| f().map(|_| future::Loop::Continue(())))
}
其余代码保持不变。现在,它可以用稳定的 Rust 进行编译,甚至报告的每秒迭代次数几乎是提议的夜间期货解决方案的两倍(对于这个综合测试的价值)。
我想使用异步客户端对从 Rust 到特定服务的请求进行基准测试,并为此创建了异步基准测试程序。
此函数应运行指定持续时间的并发线程数(实际上是未来的并行链)并报告已实现的迭代计数。
use futures::future;
use futures::prelude::*;
use std::error::Error;
use std::time::{Duration, Instant};
use std::{cell, io, rc};
use tokio::runtime::current_thread::Runtime;
use tokio::timer;
struct Config {
workers: u32,
duration: Duration,
}
/// Build infinitely repeating future
fn cycle<'a, F: Fn() -> P + 'a, P: Future + 'a>(
f: F,
) -> Box<dyn Future<Item = (), Error = P::Error> + 'a> {
Box::new(f().and_then(move |_| cycle(f)))
}
fn benchmark<'a, F: Fn() -> P + 'a, P: Future<Error = io::Error> + 'a>(
config: Config,
f: F,
) -> impl Future<Item = u32, Error = io::Error> + 'a {
let counter = rc::Rc::new(cell::Cell::new(0u32));
let f = rc::Rc::new(f);
future::select_all((0..config.workers).map({
let counter = rc::Rc::clone(&counter);
move |_| {
let counter = rc::Rc::clone(&counter);
let f = rc::Rc::clone(&f);
cycle(move || {
let counter = rc::Rc::clone(&counter);
f().map(move |_| {
counter.set(counter.get() + 1);
})
})
}
}))
.map(|((), _, _)| ())
.map_err(|(err, _, _)| err)
.select(
timer::Delay::new(Instant::now() + config.duration)
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.description())),
)
.map(move |((), _)| counter.get())
.map_err(|(err, _)| err)
}
fn main() {
let duration = std::env::args()
.skip(1)
.next()
.expect("Please provide duration in seconds")
.parse()
.expect("Duration must be integer number");
let ms = Duration::from_millis(1);
let mut rt = Runtime::new().expect("Could not create runtime");
loop {
let iters = rt
.block_on(
benchmark(
Config {
workers: 65536,
duration: Duration::from_secs(duration),
},
|| {
/// Substitute actual benchmarked call
timer::Delay::new(Instant::now() + ms)
.map_err(|err| panic!("Failed to set delay: {:?}", err))
},
)
.map_err(|err| panic!("Benchamrking error: {:?}", err)),
)
.expect("Runtime error");
println!("{} iters/sec", iters as u64 / duration);
}
}
但是,此基准测试报告的结果和内存消耗会随着基准测试持续时间的增加而降低,例如在我的电脑上:
cargo run --release 1
~ 900k iterations/sec
cargo run --release 2
~ 700k iterations/sec
cargo run --release 10
~ 330k iterations/sec
此外,内存使用量随着基准函数 运行s 的增加而迅速增长。我尝试使用 valgrind
来查找内存泄漏,但它只报告仍然可以访问所有分配的内存。
我该如何解决这个问题?
看起来cycle
返回的Box
直到benchmark
结束才被释放,内存分配/释放需要越来越多的时间。
我用 async_await
重写了你的程序,没有 Box
,现在结果是一致的:
#![feature(async_await)]
use futures::{compat::Future01CompatExt, future, prelude::*, select};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use tokio::timer;
struct Config {
workers: u32,
duration: Duration,
}
// Build infinitely repeating future
async fn cycle<'a, F: Fn() -> P + 'a, P: Future<Output = ()> + 'a>(f: F) {
loop {
f().await;
}
}
async fn benchmark<'a, F: Fn() -> P + 'a, P: Future<Output = ()> + 'a>(
config: Config,
f: F,
) -> usize {
let counter = AtomicUsize::new(0);
let infinite_counter = future::select_all((0..config.workers).map(|_| {
cycle(|| {
f().map(|_| {
counter.fetch_add(1, Ordering::SeqCst);
})
})
.boxed_local()
}));
let timer = timer::Delay::new(Instant::now() + config.duration)
.compat()
.unwrap_or_else(|_| panic!("Boom !"));
select! {
a = infinite_counter.fuse() => (),
b = timer.fuse() => (),
};
counter.load(Ordering::SeqCst)
}
fn main() {
let duration = std::env::args()
.skip(1)
.next()
.expect("Please provide duration in seconds")
.parse()
.expect("Duration must be integer number");
let ms = Duration::from_millis(1);
// Use actix_rt runtime instead of vanilla tokio because I want
// to restrict to one OS thread and avoid needing async primitives
let mut rt = actix_rt::Runtime::new().expect("Could not create runtime");;
loop {
let iters = rt
.block_on(
benchmark(
Config {
workers: 65536,
duration: Duration::from_secs(duration),
},
|| {
// Substitute actual benchmarked call
timer::Delay::new(Instant::now() + ms)
.compat()
.unwrap_or_else(|_| panic!("Boom !"))
},
)
.boxed_local()
.unit_error()
.compat(),
)
.expect("Runtime error");
println!("{} iters/sec", iters as u64 / duration);
}
}
这是我第一次使用 futures 0.3,所以我并没有真正了解 select!
语法或 boxed_local
等部分,但它确实有效!
编辑:这是来自 Cargo.toml
[dependencies]
futures-preview = { version = "0.3.0-alpha", features = ["nightly", "compat", "async-await"] }
tokio = "0.1.22"
actix-rt = "0.2.3"
所以事实证明 cycle
确实是罪魁祸首 cycle
:
/// Build infinitely repeating future
fn cycle<'a, F: Fn() -> P + 'a, P: Future + 'a>(
f: F,
) -> impl Future<Item = (), Error = P::Error> + 'a {
future::loop_fn((), move |_| f().map(|_| future::Loop::Continue(())))
}
其余代码保持不变。现在,它可以用稳定的 Rust 进行编译,甚至报告的每秒迭代次数几乎是提议的夜间期货解决方案的两倍(对于这个综合测试的价值)。