带 webclient 的 Ribbon 负载平衡器与其余模板一不同(未正确平衡)

Ribbon load balancer with webclient differs from rest template one (not properly balanced)

我尝试将 WebClientLoadBalancerExchangeFilterFunction 一起使用:

WebClient 配置:

@Bean
public WebClient myWebClient(final LoadBalancerExchangeFilterFunction lbFunction) {
    return WebClient.builder()
            .filter(lbFunction)
            .defaultHeader(ACCEPT, APPLICATION_JSON_VALUE)
            .defaultHeader(CONTENT_ENCODING, APPLICATION_JSON_VALUE)
            .build();
} 

然后我注意到对底层服务的调用没有适当地负载平衡 - 每个实例的 RPS 始终存在差异。

然后我试着回到 RestTemplate。而且它工作正常。

RestTemplate 的配置:

private static final int CONNECT_TIMEOUT_MILLIS = 18 * DateTimeConstants.MILLIS_PER_SECOND;
private static final int READ_TIMEOUT_MILLIS = 18 * DateTimeConstants.MILLIS_PER_SECOND;

@LoadBalanced
@Bean
public RestTemplate restTemplateSearch(final RestTemplateBuilder restTemplateBuilder) {
    return restTemplateBuilder
            .errorHandler(errorHandlerSearch())
            .requestFactory(this::bufferedClientHttpRequestFactory)
            .build();
}

private ClientHttpRequestFactory bufferedClientHttpRequestFactory() {
    final SimpleClientHttpRequestFactory requestFactory = new SimpleClientHttpRequestFactory();
    requestFactory.setConnectTimeout(CONNECT_TIMEOUT_MILLIS);
    requestFactory.setReadTimeout(READ_TIMEOUT_MILLIS);
    return new BufferingClientHttpRequestFactory(requestFactory);
}

private ResponseErrorHandler errorHandlerSearch() {
    return new DefaultResponseErrorHandler() {
        @Override
        public boolean hasError(ClientHttpResponse response) throws IOException {
            return response.getStatusCode().is5xxServerError();
        }
    };
}

负载平衡使用 WebClient 配置最多 11:25,然后切换回 RestTemplate

为什么会有这样的差异,我如何使用 WebClient 在每个实例上获得相同数量的 RPS?线索可能是旧实例比新实例收到更多请求。

我尝试了一些调试,并且调用了相同的(默认值如 ZoneAwareLoadBalancer)逻辑。

您必须配置 Ribbon 配置以修改负载平衡行为(请阅读下文)。

默认情况下(您自己发现)正在使用 ZoneAwareLoadBalancer。在 source code for ZoneAwareLoadBalancer 中我们读到:
我强调的一些机制可能会导致您看到的 RPS 模式):

The key metric used to measure the zone condition is Average Active Requests, which is aggregated per rest client per zone. It is the total outstanding requests in a zone divided by number of available targeted instances (excluding circuit breaker tripped instances). This metric is very effective when timeout occurs slowly on a bad zone.

The LoadBalancer will calculate and examine zone stats of all available zones. If the Average Active Requests for any zone has reached a configured threshold, this zone will be dropped from the active server list. In case more than one zone has reached the threshold, the zone with the most active requests per server will be dropped. Once the the worst zone is dropped, a zone will be chosen among the rest with the probability proportional to its number of instances.

如果您的流量由一个区域(也许是同一个框?)提供服务,那么您可能会遇到一些额外的混乱情况。

另请注意,在不使用 LoadBallancedFilterFunction 的情况下,平均 RPS 与使用它时相同(在图表上,所有线都汇聚到中线)之后变化,因此从全局来看,两种负载平衡策略都消耗相同数量的可用带宽,但方式不同。

要修改您的功能区客户端设置,请尝试以下操作:

public class RibbonConfig {

  @Autowired
  IClientConfig ribbonClientConfig;

  @Bean
  public IPing ribbonPing (IClientConfig config) {
    return new PingUrl();//default is a NoOpPing
  }

  @Bean
  public IRule ribbonRule(IClientConfig config) {
    return new AvailabilityFilteringRule(); // here override the default ZoneAvoidanceRule
  }

}

然后不要忘记全局定义您的 Ribbon 客户端配置:

@SpringBootApplication
@RibbonClient(name = "app", configuration = RibbonConfig.class)
public class App {
  //...
}

希望对您有所帮助!

我做了简单的 POC,一切都与 Web 客户端和默认配置的 rest 模板完全相同。

休息服务器代码:

@SpringBootApplication
internal class RestServerApplication

fun main(args: Array<String>) {
    runApplication<RestServerApplication>(*args)
}

class BeansInitializer : ApplicationContextInitializer<GenericApplicationContext> {
    override fun initialize(context: GenericApplicationContext) {
        serverBeans().initialize(context)
    }
}

fun serverBeans() = beans {
    bean("serverRoutes") {
        PingRoutes(ref()).router()
    }
    bean<PingHandler>()
}

internal class PingRoutes(private val pingHandler: PingHandler) {
    fun router() = router {
        GET("/api/ping", pingHandler::ping)
    }
}

class PingHandler(private val env: Environment) {
    fun ping(serverRequest: ServerRequest): Mono<ServerResponse> {
        return Mono
            .fromCallable {
                // sleap added to simulate some work
                Thread.sleep(2000)
            }
            .subscribeOn(elastic())
            .flatMap {
                ServerResponse.ok()
                    .syncBody("pong-${env["HOSTNAME"]}-${env["server.port"]}")
            }
    }
}

application.yaml中添加:

context.initializer.classes: com.lbpoc.server.BeansInitializer

gradle 中的依赖项:

implementation('org.springframework.boot:spring-boot-starter-webflux')

休息客户端代码:

@SpringBootApplication
internal class RestClientApplication {
    @Bean
    @LoadBalanced
    fun webClientBuilder(): WebClient.Builder {
        return WebClient.builder()
    }

    @Bean
    @LoadBalanced
    fun restTemplate() = RestTemplateBuilder().build()
}

fun main(args: Array<String>) {
    runApplication<RestClientApplication>(*args)
}

class BeansInitializer : ApplicationContextInitializer<GenericApplicationContext> {
    override fun initialize(context: GenericApplicationContext) {
        clientBeans().initialize(context)
    }
}

fun clientBeans() = beans {
    bean("clientRoutes") {
        PingRoutes(ref()).router()
    }
    bean<PingHandlerWithWebClient>()
    bean<PingHandlerWithRestTemplate>()
}

internal class PingRoutes(private val pingHandlerWithWebClient: PingHandlerWithWebClient) {
    fun router() = org.springframework.web.reactive.function.server.router {
        GET("/api/ping", pingHandlerWithWebClient::ping)
    }
}

class PingHandlerWithWebClient(private val webClientBuilder: WebClient.Builder) {
    fun ping(serverRequest: ServerRequest) = webClientBuilder.build()
        .get()
        .uri("http://rest-server-poc/api/ping")
        .retrieve()
        .bodyToMono(String::class.java)
        .onErrorReturn(TimeoutException::class.java, "Read/write timeout")
        .flatMap {
            ServerResponse.ok().syncBody(it)
        }
}

class PingHandlerWithRestTemplate(private val restTemplate: RestTemplate) {
    fun ping(serverRequest: ServerRequest) = Mono.fromCallable {
        restTemplate.getForEntity("http://rest-server-poc/api/ping", String::class.java)
    }.flatMap {
        ServerResponse.ok().syncBody(it.body!!)
    }
}

application.yaml中添加:

context.initializer.classes: com.lbpoc.client.BeansInitializer
spring:
  application:
    name: rest-client-poc-for-load-balancing
logging:
  level.org.springframework.cloud: DEBUG
  level.com.netflix.loadbalancer: DEBUG
rest-server-poc:
  listOfServers: localhost:8081,localhost:8082

gradle 中的依赖项:

implementation('org.springframework.boot:spring-boot-starter-webflux')
implementation('org.springframework.cloud:spring-cloud-starter-netflix-ribbon')

您可以尝试使用两个或更多服务器实例,它与 Web 客户端和 rest 模板完全相同。

功能区默认使用 zoneAwareLoadBalancer,如果您只有一个区域,则服务器的所有实例都将在 "unknown" 区域中注册。

您可能无法通过 Web 客户端保持连接。 Web 客户端在多个请求中重用相同的连接,其余模板不会这样做。如果您的客户端和服务器之间有某种代理,那么您可能会遇到 Web 客户端重用连接的问题。要验证它,您可以像这样修改 Web 客户端 bean 和 运行 测试:

@Bean
@LoadBalanced
fun webClientBuilder(): WebClient.Builder {
    return WebClient.builder()
        .clientConnector(ReactorClientHttpConnector { options ->
            options
                .compression(true)
                .afterNettyContextInit { ctx ->
                    ctx.markPersistent(false)
                }
        })
}

当然这不是一个好的生产解决方案,但这样做你可以检查你的客户端应用程序内部的配置是否有问题,或者问题可能出在外部,客户端和服务器之间。例如。如果您正在使用 kubernetes 并使用服务器节点 IP 地址在服务发现中注册您的服务,那么对此类服务的每次调用都将通过 kube-proxy 负载均衡器,并将(默认情况下将使用循环法)路由到该服务的某个 pod .