在高负载下测试 spring webflux Webclient 时出现问题
Problems testing spring webflux Webclient with high load
我正在尝试学习 Spring 来自 C# 和 NetCore 的 Webflux,我们有一个非常相似的问题,如 this post,其中第三方服务提供商有一些响应时间问题。
但是用spring-webclient测试是响应时间加倍,不知道是不是漏了什么
我试图创建一个类似的例子:
- 一台电脑运行3台服务器
- 模拟一些随机延迟时间的演示服务器(端口 8080)
- 使用异步调用我的“等待”服务器(端口 5000)的 C# 中的测试服务器
- 使用 spring 和 webclient 测试服务器以调用我的“等待”服务器(端口 8081)
- 其他计算机运行 JMeter 有 1000 个客户端,每个客户端有 10 个回合
一些代码
等待服务器
只是一条简单的路线
@Configuration
class TestRouter(private val middlemanDemo: MiddlemanDemo) {
@Bean
fun route() = router {
GET("/testWait", middlemanDemo::middleTestAndGetWait)
}
}
处理程序有一个带有种子的随机生成器,因此每个测试都可以生成相同的延迟序列
@Service
class TestWaiter {
companion object RandomManager {
private lateinit var random: Random
init {
resetTimer()
}
@Synchronized
fun next(): Long {
val random = random.nextLong(0, 10)
return random * 2
}
fun resetTimer() {
random = Random(12345)
}
}
private val logger = LoggerFactory.getLogger(javaClass)
fun testAndGetWait(request: ServerRequest): Mono<ServerResponse> {
val wait = next()
logger.debug("Wait is: {}", wait)
return ServerResponse
.ok()
.json()
.bodyValue(wait)
.delayElement(Duration.ofSeconds(wait))
}
fun reset(request: ServerRequest): Mono<ServerResponse> {
logger.info("Random reset")
resetTimer()
return ServerResponse
.ok()
.build()
}
}
使用 JMeter 对服务器进行负载测试我可以看到大约 9-10 秒的稳定响应时间和 100/秒的最大吞吐量:
C# 异步演示服务器
尝试使用 C# 中间人,此服务器仅调用主演示服务器:
控制器
[HttpGet]
public async Task<string> Get()
{
return await _waiterClient.GetWait();
}
以及带有 httpClient 的服务
private readonly HttpClient _client;
public WaiterClient(HttpClient client)
{
_client = client;
client.BaseAddress = new Uri("http://192.168.0.121:8080");
}
public async Task<string> GetWait()
{
var response = await _client.GetAsync("/testWait");
var waitTime = await response.Content.ReadAsStringAsync();
return waitTime;
}
}
测试此服务给出了相同的响应时间,但开销吞吐量略少,但这是可以理解的
spring-webclient 实现
这个客户端也很简单,就一个路由
@Configuration
class TestRouter(private val middlemanDemo: MiddlemanDemo) {
@Bean
fun route() = router {
GET("/testWait", middlemanDemo::middleTestAndGetWait)
}
}
处理程序仅使用网络客户端调用服务
@Service
class MiddlemanDemo {
private val client = WebClient.create("http://127.0.0.1:8080")
fun middleTestAndGetWait(request: ServerRequest): Mono<ServerResponse> {
return client
.get()
.uri("/testWait")
.retrieve()
.bodyToMono(Int::class.java)
.flatMap(::processResponse)
}
fun processResponse(delay: Int): Mono<ServerResponse> {
return ServerResponse
.ok()
.bodyValue(delay)
}
}
然而,运行测试,吞吐量仅达到50/sec
而且响应时间会加倍,就像我再等一次一样,直到负载再次下降
我认为可能是池获取时间问题。
我假设您的服务器超过 1k TPS,并且每个请求看起来大约需要 9 秒。但是默认的HTTP客户端连接池是500。请参考Projector Reactor - Connection Pool.
请检查日志 PoolAcquireTimeoutException
或者您的服务器是否需要一些时间来等待池获取。
我标记 答案是因为它指出了我正确的方法,但我会添加完整的解决方案供任何人查找:
关键是根据我的需要创建一个连接池。默认值为 500,如 JK.Lee 所述。
@Service
class MiddlemanDemo(webClientBuilder: WebClient.Builder) {
private val client: WebClient
init {
val provider = ConnectionProvider.builder("fixed")
.maxConnections(2000) // This is the important part
.build()
val httpClient = HttpClient
.create(provider)
client = webClientBuilder
.clientConnector(ReactorClientHttpConnector(httpClient))
.baseUrl("http://localhost:8080")
.build()
}
fun middleTestAndGetWait(request: ServerRequest): Mono<ServerResponse> {
return client
.get()
.uri("/testWait")
.retrieve()
.bodyToMono(Int::class.java)
.flatMap(::processResponse)
}
fun processResponse(delay: Int): Mono<ServerResponse> {
return ServerResponse
.ok()
.bodyValue(delay)
}
}
我正在尝试学习 Spring 来自 C# 和 NetCore 的 Webflux,我们有一个非常相似的问题,如 this post,其中第三方服务提供商有一些响应时间问题。
但是用spring-webclient测试是响应时间加倍,不知道是不是漏了什么
我试图创建一个类似的例子:
- 一台电脑运行3台服务器
- 模拟一些随机延迟时间的演示服务器(端口 8080)
- 使用异步调用我的“等待”服务器(端口 5000)的 C# 中的测试服务器
- 使用 spring 和 webclient 测试服务器以调用我的“等待”服务器(端口 8081)
- 其他计算机运行 JMeter 有 1000 个客户端,每个客户端有 10 个回合
一些代码
等待服务器
只是一条简单的路线
@Configuration
class TestRouter(private val middlemanDemo: MiddlemanDemo) {
@Bean
fun route() = router {
GET("/testWait", middlemanDemo::middleTestAndGetWait)
}
}
处理程序有一个带有种子的随机生成器,因此每个测试都可以生成相同的延迟序列
@Service
class TestWaiter {
companion object RandomManager {
private lateinit var random: Random
init {
resetTimer()
}
@Synchronized
fun next(): Long {
val random = random.nextLong(0, 10)
return random * 2
}
fun resetTimer() {
random = Random(12345)
}
}
private val logger = LoggerFactory.getLogger(javaClass)
fun testAndGetWait(request: ServerRequest): Mono<ServerResponse> {
val wait = next()
logger.debug("Wait is: {}", wait)
return ServerResponse
.ok()
.json()
.bodyValue(wait)
.delayElement(Duration.ofSeconds(wait))
}
fun reset(request: ServerRequest): Mono<ServerResponse> {
logger.info("Random reset")
resetTimer()
return ServerResponse
.ok()
.build()
}
}
使用 JMeter 对服务器进行负载测试我可以看到大约 9-10 秒的稳定响应时间和 100/秒的最大吞吐量:
C# 异步演示服务器
尝试使用 C# 中间人,此服务器仅调用主演示服务器:
控制器
[HttpGet]
public async Task<string> Get()
{
return await _waiterClient.GetWait();
}
以及带有 httpClient 的服务
private readonly HttpClient _client;
public WaiterClient(HttpClient client)
{
_client = client;
client.BaseAddress = new Uri("http://192.168.0.121:8080");
}
public async Task<string> GetWait()
{
var response = await _client.GetAsync("/testWait");
var waitTime = await response.Content.ReadAsStringAsync();
return waitTime;
}
}
测试此服务给出了相同的响应时间,但开销吞吐量略少,但这是可以理解的
spring-webclient 实现
这个客户端也很简单,就一个路由
@Configuration
class TestRouter(private val middlemanDemo: MiddlemanDemo) {
@Bean
fun route() = router {
GET("/testWait", middlemanDemo::middleTestAndGetWait)
}
}
处理程序仅使用网络客户端调用服务
@Service
class MiddlemanDemo {
private val client = WebClient.create("http://127.0.0.1:8080")
fun middleTestAndGetWait(request: ServerRequest): Mono<ServerResponse> {
return client
.get()
.uri("/testWait")
.retrieve()
.bodyToMono(Int::class.java)
.flatMap(::processResponse)
}
fun processResponse(delay: Int): Mono<ServerResponse> {
return ServerResponse
.ok()
.bodyValue(delay)
}
}
然而,运行测试,吞吐量仅达到50/sec
而且响应时间会加倍,就像我再等一次一样,直到负载再次下降
我认为可能是池获取时间问题。
我假设您的服务器超过 1k TPS,并且每个请求看起来大约需要 9 秒。但是默认的HTTP客户端连接池是500。请参考Projector Reactor - Connection Pool.
请检查日志 PoolAcquireTimeoutException
或者您的服务器是否需要一些时间来等待池获取。
我标记
关键是根据我的需要创建一个连接池。默认值为 500,如 JK.Lee 所述。
@Service
class MiddlemanDemo(webClientBuilder: WebClient.Builder) {
private val client: WebClient
init {
val provider = ConnectionProvider.builder("fixed")
.maxConnections(2000) // This is the important part
.build()
val httpClient = HttpClient
.create(provider)
client = webClientBuilder
.clientConnector(ReactorClientHttpConnector(httpClient))
.baseUrl("http://localhost:8080")
.build()
}
fun middleTestAndGetWait(request: ServerRequest): Mono<ServerResponse> {
return client
.get()
.uri("/testWait")
.retrieve()
.bodyToMono(Int::class.java)
.flatMap(::processResponse)
}
fun processResponse(delay: Int): Mono<ServerResponse> {
return ServerResponse
.ok()
.bodyValue(delay)
}
}