如何为不同的 Apache Camel 路由设置多个不同的 WebFlux 客户端属性?

How to set up several different WebFlux client properties for the different Apache Camel routes?

在路由设置中,我们调用 WebClient.build() 在路由声明之前设置:

@Override
  public void configure() {
    createSubscription(activeProfile.equalsIgnoreCase("RESTART"));
    from(String.format("reactive-streams:%s", streamName))
        .to("log:camel.proxy?level=INFO&groupInterval=500000")
        .to(String.format("kafka:%s?brokers=%s", kafkaTopic, kafkaBrokerUrls));
  }

  private void createSubscription(boolean restart) {
    WebClient.builder()
        .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.TEXT_XML_VALUE)
        .build()
        .post()
        .uri(initialRequestUri)
        .body(BodyInserters.fromObject(restart ? String.format(restartRequestBody, ZonedDateTime.now(ZoneId.of("UTC")).toString().replace("[UTC]", "")) : initialRequestBody))
        .retrieve()
        .bodyToMono(String.class)
        .map(initResp ->
            new JSONObject(initResp)
                .getJSONObject("RESPONSE")
                .getJSONArray("RESULT")
                .getJSONObject(0)
                .getJSONObject("INFO")
                .getString("SSEURL")
        )
        .flatMapMany(url -> {
          log.info(url);
          return WebClient.create()
              .get()
              .uri(url)
              .retrieve()
              .bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {
              })
              .flatMap(sse -> {
                    val data = new JSONObject(sse.data())
                        .getJSONObject("RESPONSE")
                        .getJSONArray("RESULT")
                        .getJSONObject(0)
                        .getJSONArray(apiName);
                    val list = new ArrayList<String>();
                    for (int i = 0; i < data.length(); i++) {
                      list.add(data.getJSONObject(i).toString());
                    }
                    return Flux.fromIterable(list);
                  }
              );
            }
        )
        .onBackpressureBuffer()
        .flatMap(msg -> camelReactiveStreamsService.toStream(streamName, msg, String.class))
        .doFirst(() -> log.info(String.format("Reactive stream %s was %s", streamName, restart ? "restarted" : "started")))
        .doOnError(err -> {
          log.error(String.format("Reactive stream %s has terminated with error, restarting", streamName), err);
          createSubscription(true);
        })
        .doOnComplete(() -> {
          log.warn(String.format("Reactive stream %s has completed, restarting", streamName));
          createSubscription(true);
        })
        .subscribe();
  }

根据我的理解,WebClient 设置是针对整个 Spring 引导应用程序而不是 Apache Camel 的特定路由(它不针对特定路由 ID 或 url不知何故),这就是为什么使用其他 url 的新反应流的新路由和 headers/initial 消息的其他需求也会得到这个设置,这是不需要的。

所以,这里的问题是,是否可以建立一个特定的 WebClient 设置,而不是与整个应用程序相关联,而是与特定路由相关联,并使其应用于该路由?

此配置是否适用于 Spring DSL?

那里的申请方式比较复杂:

  1. 创建2条路由,第一个最先执行一次,触发特定bean的特定方法,通过方法参数传递WebClient.builder()的设置并执行订阅 WebFlux。是的,反应流设置是在 Spring Boot 应用程序的 Spring 上下文中完成的,而不是 Apache Camel 上下文。所以它与路由没有直接关联,而不是在特定路由启动时调用设置。所以路线看起来像:

     <?xml version="1.0" encoding="UTF-8"?>
    
  1. 提供bean。我已经把它放到 Spring Boot 应用程序,而不是像下面这样的 Apache Camel 上下文。这里的缺点是我必须把它放在这里,不管具体路线是否有效或现在。所以一直在记忆中。

     import org.apache.camel.CamelContext;
     import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
     import org.json.JSONArray;
     import org.json.JSONObject;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     import org.springframework.core.ParameterizedTypeReference;
     import org.springframework.http.HttpHeaders;
     import org.springframework.http.MediaType;
     import org.springframework.http.codec.ServerSentEvent;
     import org.springframework.stereotype.Component;
     import org.springframework.web.reactive.function.BodyInserters;
     import org.springframework.web.reactive.function.client.WebClient;
     import reactor.core.publisher.Flux;    
     import java.time.ZoneId;
     import java.time.ZonedDateTime;
     import java.util.ArrayList;
    
     @Component
     public class WebFluxSetUp {
         private final Logger logger = LoggerFactory.getLogger(WebFluxSetUp.class);
         private final CamelContext camelContext;
         private final CamelReactiveStreamsService camelReactiveStreamsService;
    
         WebFluxSetUp(CamelContext camelContext, CamelReactiveStreamsService camelReactiveStreamsService) {
             this.camelContext = camelContext;
             this.camelReactiveStreamsService = camelReactiveStreamsService;
         }
    
         public void executeWebfluxSetup(boolean restart, String initialRequestUri, String restartRequestBody, String initialRequestBody, String apiName, String streamName) {
             {
                 WebClient.builder().defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.TEXT_XML_VALUE).build().post().uri(initialRequestUri).body(BodyInserters.fromObject(restart ? String.format(restartRequestBody, ZonedDateTime.now(ZoneId.of("UTC")).toString().replace("[UTC]", "")) : initialRequestBody)).retrieve().bodyToMono(String.class).map(initResp -> new JSONObject(initResp).getJSONObject("RESPONSE").getJSONArray("RESULT").getJSONObject(0).getJSONObject("INFO").getString("SSEURL")).flatMapMany(url -> {
                     logger.info(url);
                     return WebClient.create().get().uri(url).retrieve().bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {
                     }).flatMap(sse -> {
                         JSONArray data = new JSONObject(sse.data()).getJSONObject("RESPONSE").getJSONArray("RESULT").getJSONObject(0).getJSONArray(apiName);
                         ArrayList<String> list = new ArrayList<String>();
                         for (int i = 0; i < data.length(); i++) {
                             list.add(data.getJSONObject(i).toString());
                         }
                         return Flux.fromIterable(list);
                     });
                 }).onBackpressureBuffer().flatMap(msg -> camelReactiveStreamsService.toStream(streamName, msg, String.class)).doFirst(() -> logger.info(String.format("Reactive stream %s was %s", streamName, restart ? "restarted" : "started"))).doOnError(err -> {
                     logger.error(String.format("Reactive stream %s has terminated with error, restarting", streamName), err);
                     executeWebfluxSetup(true, initialRequestUri, restartRequestBody, initialRequestBody, apiName, streamName);
                 }).doOnComplete(() -> {
                     logger.warn(String.format("Reactive stream %s has completed, restarting", streamName));
                     executeWebfluxSetup(true, initialRequestUri, restartRequestBody, initialRequestBody, apiName, streamName);
                 }).subscribe();
             }
         }
     }
    
  2. 路由停止时还有其他缺点,WebFlux 客户端仍在尝试向反应流发送垃圾邮件 url。并且没有与路由相关的 api/event 处理程序来阻止它并使 not had-coded 到特定路由。