如何为不同的 Apache Camel 路由设置多个不同的 WebFlux 客户端属性?
How to set up several different WebFlux client properties for the different Apache Camel routes?
在路由设置中,我们调用 WebClient.build() 在路由声明之前设置:
@Override
public void configure() {
createSubscription(activeProfile.equalsIgnoreCase("RESTART"));
from(String.format("reactive-streams:%s", streamName))
.to("log:camel.proxy?level=INFO&groupInterval=500000")
.to(String.format("kafka:%s?brokers=%s", kafkaTopic, kafkaBrokerUrls));
}
private void createSubscription(boolean restart) {
WebClient.builder()
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.TEXT_XML_VALUE)
.build()
.post()
.uri(initialRequestUri)
.body(BodyInserters.fromObject(restart ? String.format(restartRequestBody, ZonedDateTime.now(ZoneId.of("UTC")).toString().replace("[UTC]", "")) : initialRequestBody))
.retrieve()
.bodyToMono(String.class)
.map(initResp ->
new JSONObject(initResp)
.getJSONObject("RESPONSE")
.getJSONArray("RESULT")
.getJSONObject(0)
.getJSONObject("INFO")
.getString("SSEURL")
)
.flatMapMany(url -> {
log.info(url);
return WebClient.create()
.get()
.uri(url)
.retrieve()
.bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {
})
.flatMap(sse -> {
val data = new JSONObject(sse.data())
.getJSONObject("RESPONSE")
.getJSONArray("RESULT")
.getJSONObject(0)
.getJSONArray(apiName);
val list = new ArrayList<String>();
for (int i = 0; i < data.length(); i++) {
list.add(data.getJSONObject(i).toString());
}
return Flux.fromIterable(list);
}
);
}
)
.onBackpressureBuffer()
.flatMap(msg -> camelReactiveStreamsService.toStream(streamName, msg, String.class))
.doFirst(() -> log.info(String.format("Reactive stream %s was %s", streamName, restart ? "restarted" : "started")))
.doOnError(err -> {
log.error(String.format("Reactive stream %s has terminated with error, restarting", streamName), err);
createSubscription(true);
})
.doOnComplete(() -> {
log.warn(String.format("Reactive stream %s has completed, restarting", streamName));
createSubscription(true);
})
.subscribe();
}
根据我的理解,WebClient 设置是针对整个 Spring 引导应用程序而不是 Apache Camel 的特定路由(它不针对特定路由 ID 或 url不知何故),这就是为什么使用其他 url 的新反应流的新路由和 headers/initial 消息的其他需求也会得到这个设置,这是不需要的。
所以,这里的问题是,是否可以建立一个特定的 WebClient 设置,而不是与整个应用程序相关联,而是与特定路由相关联,并使其应用于该路由?
此配置是否适用于 Spring DSL?
那里的申请方式比较复杂:
创建2条路由,第一个最先执行一次,触发特定bean的特定方法,通过方法参数传递WebClient.builder()
的设置并执行订阅 WebFlux。是的,反应流设置是在 Spring Boot 应用程序的 Spring 上下文中完成的,而不是 Apache Camel 上下文。所以它与路由没有直接关联,而不是在特定路由启动时调用设置。所以路线看起来像:
<?xml version="1.0" encoding="UTF-8"?>
提供bean。我已经把它放到 Spring Boot 应用程序,而不是像下面这样的 Apache Camel 上下文。这里的缺点是我必须把它放在这里,不管具体路线是否有效或现在。所以一直在记忆中。
import org.apache.camel.CamelContext;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
import org.json.JSONArray;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
@Component
public class WebFluxSetUp {
private final Logger logger = LoggerFactory.getLogger(WebFluxSetUp.class);
private final CamelContext camelContext;
private final CamelReactiveStreamsService camelReactiveStreamsService;
WebFluxSetUp(CamelContext camelContext, CamelReactiveStreamsService camelReactiveStreamsService) {
this.camelContext = camelContext;
this.camelReactiveStreamsService = camelReactiveStreamsService;
}
public void executeWebfluxSetup(boolean restart, String initialRequestUri, String restartRequestBody, String initialRequestBody, String apiName, String streamName) {
{
WebClient.builder().defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.TEXT_XML_VALUE).build().post().uri(initialRequestUri).body(BodyInserters.fromObject(restart ? String.format(restartRequestBody, ZonedDateTime.now(ZoneId.of("UTC")).toString().replace("[UTC]", "")) : initialRequestBody)).retrieve().bodyToMono(String.class).map(initResp -> new JSONObject(initResp).getJSONObject("RESPONSE").getJSONArray("RESULT").getJSONObject(0).getJSONObject("INFO").getString("SSEURL")).flatMapMany(url -> {
logger.info(url);
return WebClient.create().get().uri(url).retrieve().bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {
}).flatMap(sse -> {
JSONArray data = new JSONObject(sse.data()).getJSONObject("RESPONSE").getJSONArray("RESULT").getJSONObject(0).getJSONArray(apiName);
ArrayList<String> list = new ArrayList<String>();
for (int i = 0; i < data.length(); i++) {
list.add(data.getJSONObject(i).toString());
}
return Flux.fromIterable(list);
});
}).onBackpressureBuffer().flatMap(msg -> camelReactiveStreamsService.toStream(streamName, msg, String.class)).doFirst(() -> logger.info(String.format("Reactive stream %s was %s", streamName, restart ? "restarted" : "started"))).doOnError(err -> {
logger.error(String.format("Reactive stream %s has terminated with error, restarting", streamName), err);
executeWebfluxSetup(true, initialRequestUri, restartRequestBody, initialRequestBody, apiName, streamName);
}).doOnComplete(() -> {
logger.warn(String.format("Reactive stream %s has completed, restarting", streamName));
executeWebfluxSetup(true, initialRequestUri, restartRequestBody, initialRequestBody, apiName, streamName);
}).subscribe();
}
}
}
路由停止时还有其他缺点,WebFlux 客户端仍在尝试向反应流发送垃圾邮件 url。并且没有与路由相关的 api/event 处理程序来阻止它并使 not had-coded 到特定路由。
在路由设置中,我们调用 WebClient.build() 在路由声明之前设置:
@Override
public void configure() {
createSubscription(activeProfile.equalsIgnoreCase("RESTART"));
from(String.format("reactive-streams:%s", streamName))
.to("log:camel.proxy?level=INFO&groupInterval=500000")
.to(String.format("kafka:%s?brokers=%s", kafkaTopic, kafkaBrokerUrls));
}
private void createSubscription(boolean restart) {
WebClient.builder()
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.TEXT_XML_VALUE)
.build()
.post()
.uri(initialRequestUri)
.body(BodyInserters.fromObject(restart ? String.format(restartRequestBody, ZonedDateTime.now(ZoneId.of("UTC")).toString().replace("[UTC]", "")) : initialRequestBody))
.retrieve()
.bodyToMono(String.class)
.map(initResp ->
new JSONObject(initResp)
.getJSONObject("RESPONSE")
.getJSONArray("RESULT")
.getJSONObject(0)
.getJSONObject("INFO")
.getString("SSEURL")
)
.flatMapMany(url -> {
log.info(url);
return WebClient.create()
.get()
.uri(url)
.retrieve()
.bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {
})
.flatMap(sse -> {
val data = new JSONObject(sse.data())
.getJSONObject("RESPONSE")
.getJSONArray("RESULT")
.getJSONObject(0)
.getJSONArray(apiName);
val list = new ArrayList<String>();
for (int i = 0; i < data.length(); i++) {
list.add(data.getJSONObject(i).toString());
}
return Flux.fromIterable(list);
}
);
}
)
.onBackpressureBuffer()
.flatMap(msg -> camelReactiveStreamsService.toStream(streamName, msg, String.class))
.doFirst(() -> log.info(String.format("Reactive stream %s was %s", streamName, restart ? "restarted" : "started")))
.doOnError(err -> {
log.error(String.format("Reactive stream %s has terminated with error, restarting", streamName), err);
createSubscription(true);
})
.doOnComplete(() -> {
log.warn(String.format("Reactive stream %s has completed, restarting", streamName));
createSubscription(true);
})
.subscribe();
}
根据我的理解,WebClient 设置是针对整个 Spring 引导应用程序而不是 Apache Camel 的特定路由(它不针对特定路由 ID 或 url不知何故),这就是为什么使用其他 url 的新反应流的新路由和 headers/initial 消息的其他需求也会得到这个设置,这是不需要的。
所以,这里的问题是,是否可以建立一个特定的 WebClient 设置,而不是与整个应用程序相关联,而是与特定路由相关联,并使其应用于该路由?
此配置是否适用于 Spring DSL?
那里的申请方式比较复杂:
创建2条路由,第一个最先执行一次,触发特定bean的特定方法,通过方法参数传递
WebClient.builder()
的设置并执行订阅 WebFlux。是的,反应流设置是在 Spring Boot 应用程序的 Spring 上下文中完成的,而不是 Apache Camel 上下文。所以它与路由没有直接关联,而不是在特定路由启动时调用设置。所以路线看起来像:<?xml version="1.0" encoding="UTF-8"?>
提供bean。我已经把它放到 Spring Boot 应用程序,而不是像下面这样的 Apache Camel 上下文。这里的缺点是我必须把它放在这里,不管具体路线是否有效或现在。所以一直在记忆中。
import org.apache.camel.CamelContext; import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService; import org.json.JSONArray; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; import org.springframework.http.codec.ServerSentEvent; import org.springframework.stereotype.Component; import org.springframework.web.reactive.function.BodyInserters; import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Flux; import java.time.ZoneId; import java.time.ZonedDateTime; import java.util.ArrayList; @Component public class WebFluxSetUp { private final Logger logger = LoggerFactory.getLogger(WebFluxSetUp.class); private final CamelContext camelContext; private final CamelReactiveStreamsService camelReactiveStreamsService; WebFluxSetUp(CamelContext camelContext, CamelReactiveStreamsService camelReactiveStreamsService) { this.camelContext = camelContext; this.camelReactiveStreamsService = camelReactiveStreamsService; } public void executeWebfluxSetup(boolean restart, String initialRequestUri, String restartRequestBody, String initialRequestBody, String apiName, String streamName) { { WebClient.builder().defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.TEXT_XML_VALUE).build().post().uri(initialRequestUri).body(BodyInserters.fromObject(restart ? String.format(restartRequestBody, ZonedDateTime.now(ZoneId.of("UTC")).toString().replace("[UTC]", "")) : initialRequestBody)).retrieve().bodyToMono(String.class).map(initResp -> new JSONObject(initResp).getJSONObject("RESPONSE").getJSONArray("RESULT").getJSONObject(0).getJSONObject("INFO").getString("SSEURL")).flatMapMany(url -> { logger.info(url); return WebClient.create().get().uri(url).retrieve().bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() { }).flatMap(sse -> { JSONArray data = new JSONObject(sse.data()).getJSONObject("RESPONSE").getJSONArray("RESULT").getJSONObject(0).getJSONArray(apiName); ArrayList<String> list = new ArrayList<String>(); for (int i = 0; i < data.length(); i++) { list.add(data.getJSONObject(i).toString()); } return Flux.fromIterable(list); }); }).onBackpressureBuffer().flatMap(msg -> camelReactiveStreamsService.toStream(streamName, msg, String.class)).doFirst(() -> logger.info(String.format("Reactive stream %s was %s", streamName, restart ? "restarted" : "started"))).doOnError(err -> { logger.error(String.format("Reactive stream %s has terminated with error, restarting", streamName), err); executeWebfluxSetup(true, initialRequestUri, restartRequestBody, initialRequestBody, apiName, streamName); }).doOnComplete(() -> { logger.warn(String.format("Reactive stream %s has completed, restarting", streamName)); executeWebfluxSetup(true, initialRequestUri, restartRequestBody, initialRequestBody, apiName, streamName); }).subscribe(); } } }
路由停止时还有其他缺点,WebFlux 客户端仍在尝试向反应流发送垃圾邮件 url。并且没有与路由相关的 api/event 处理程序来阻止它并使 not had-coded 到特定路由。