Spring 集成从 REST 服务获取分页结果
Spring Integration fetch paginated results from a REST service
我正在与 REST 服务集成,想法是它由 HttpRequestExecutingMessageHandler
实现的出站网关 marketingCategoryOutboundGateway
轮询。网关向 REST 服务发出请求并将其响应推送到 marketingCategory
通道。网关本身由 marketingCategoryPollerMessageSource
使用 makeTriggeringMessage
工厂方法创建的消息触发。
问题是服务 returns 分页结果。我 something 会在 marketingCategory
频道上收听,除了我已经拥有的服务激活器之外,检查是否响应并推送一条新消息,其中包含由 [创建的递增页码=16=] 到 marketingCategoryPoller
通道,这样代码就会循环,直到它从 REST 服务获取所有页面。
Spring 集成是否允许使这样的过滤器在输入通道上接收一条消息,根据条件对其进行测试,并在条件为真时将新消息推送到输出通道?
代码:
//Responses from the REST service go to this channel
@Bean("marketingCategory")
MessageChannel marketingCategory() { return new PublishSubscribeChannel();}
//This channel is used to trigger the outbound gateway which makes a request to the REST service
@Bean
MessageChannel marketingCategoryPoller() {return new DirectChannel();}
//An adapter creating triggering messages for the gateway
@Bean
@InboundChannelAdapter(channel = "marketingCategoryPoller", poller = @Poller(fixedDelay = "15000"))
public MessageSource<String> marketingCategoryPollerMessageSource() { return () -> makeTriggeringMessage(1);}
//A factory for producing messages which trigger the gateway
private Message<String> makeTriggeringMessage(int page) {
//make a message for triggering marketingCategoryOutboundGateway
return MessageBuilder.withPayload("")
.setHeader("Host", "eclinic")
.setHeader("page", page)
.build();
}
//An outbound gateway, makes a request to the REST service and returns the response to marketingCategory channel
@Bean
@ServiceActivator(inputChannel = "marketingCategoryPoller")
public MessageHandler marketingCategoryOutboundGateway(@Qualifier("marketingCategory") MessageChannel channel) {
//make a request to the REST service and push the response to the marketingCategory channel
}
//handler for REST service responses
@Bean
@ServiceActivator(inputChannel = "marketingCategory")
public MessageHandler marketingCategoryHandler() {
return (msg) -> {
//process the categories returned by marketingCategoryOutboundGateway
};
}
我根据这篇帖子找到了解决方案 Read and download from a paginated REST-Services with spring integration:
触发与 REST 服务对话的出站网关,并使用带轮询器的入站通道适配器将响应推送到通道。
入站通道适配器是一个消息源,它最初生成一条消息,其中 header 指示要从 REST API 获取的页码。
出站网关使用页面消息 header 生成一个 url 指定所需的页面
出站网关推送REST服务响应的通道有2个订阅者:
2.1。一个服务激活器,它对获取的数据做一些事情
2.2。一个过滤器,检查这是否是最后一页,如果不是,它将消息进一步发送到 header enricher
使用的另一个通道
收到消息后,header enricher 增加其页面 header 并将消息进一步推送到触发出站网关的通道,
网关读取增加的页面 header 并从 REST 服务中获取下一页
循环不断旋转,直到 REST 服务 returns 最后一页。过滤器不允许此消息传递给 header enricher 以打破循环。
完整代码:
@Configuration
public class IntegrationConfiguration {
private final ApiGateConfig apiGateConfig;
IntegrationConfiguration(ApiGateConfig apiGateConfig) {
this.apiGateConfig = apiGateConfig;
}
@Bean("marketingCategory")
MessageChannel marketingCategory() {
return new PublishSubscribeChannel();
}
@Bean
MessageChannel marketingCategoryPoller() {
return new DirectChannel();
}
@Bean
MessageChannel marketingCategoryPollerNextPage() {
return new DirectChannel();
}
@Bean
@InboundChannelAdapter(channel = "marketingCategoryPoller", poller = @Poller(fixedDelay = "15000"))
public MessageSource<RestPageImpl<MarketingCategory>> marketingCategoryPollerMessageSource() {
return () -> makeTriggeringMessage(0);
}
/**
* Build a gateway triggering message
*/
private Message<RestPageImpl<MarketingCategory>> makeTriggeringMessage(int page) {
return MessageBuilder.withPayload(new RestPageImpl<MarketingCategory>())
.setHeader("Host", "eclinic")
.setHeader("page", page)
.build();
}
@Bean
@ServiceActivator(inputChannel = "marketingCategoryPoller")
public MessageHandler marketingCategoryOutboundGateway(@Qualifier("marketingCategory") MessageChannel channel) {
String uri = apiGateConfig.getUri() + "/marketingCategories?page={page}";
//the type of the payload
ParameterizedTypeReference<RestPageImpl<MarketingCategory>> type = new ParameterizedTypeReference<>() {
};
//page number comes from the message
SpelExpressionParser expressionParser = new SpelExpressionParser();
var uriVariables = new HashMap<String, Expression>();
uriVariables.put("page", expressionParser.parseExpression("headers.page"));
HttpRequestExecutingMessageHandler handler = new HttpRequestExecutingMessageHandler(uri);
handler.setHttpMethod(HttpMethod.GET);
handler.setExpectedResponseTypeExpression(new ValueExpression<>(type));
handler.setOutputChannel(channel);
handler.setUriVariableExpressions(uriVariables);
return handler;
}
@Bean
@ServiceActivator(inputChannel = "marketingCategory")
public MessageHandler marketingCategoryHandler() {
return (msg) -> {
var page = (RestPageImpl<MarketingCategory>) msg.getPayload();
System.out.println("Page #" + page.getNumber());
page.getContent().forEach(c -> System.out.println(c.getMarketingCategory()));
};
}
@Filter(inputChannel = "marketingCategory", outputChannel = "marketingCategoryPollerNextPage")
public boolean marketingCategoryPaginationFilter(RestPageImpl<MarketingCategory> page) {
return !page.isLast();
}
@Bean
@Transformer(inputChannel = "marketingCategoryPollerNextPage", outputChannel = "marketingCategoryPoller")
HeaderEnricher incrementPage() {
Map<String, HeaderValueMessageProcessor<?>> headersToAdd = new HashMap<>();
Expression expression = new SpelExpressionParser().parseExpression("headers.page+1");
var valueProcessor = new ExpressionEvaluatingHeaderValueMessageProcessor<>(expression, Integer.class);
valueProcessor.setOverwrite(true);
headersToAdd.put("page", valueProcessor);
return new HeaderEnricher(headersToAdd);
}
}
我正在与 REST 服务集成,想法是它由 HttpRequestExecutingMessageHandler
实现的出站网关 marketingCategoryOutboundGateway
轮询。网关向 REST 服务发出请求并将其响应推送到 marketingCategory
通道。网关本身由 marketingCategoryPollerMessageSource
使用 makeTriggeringMessage
工厂方法创建的消息触发。
问题是服务 returns 分页结果。我 something 会在 marketingCategory
频道上收听,除了我已经拥有的服务激活器之外,检查是否响应并推送一条新消息,其中包含由 [创建的递增页码=16=] 到 marketingCategoryPoller
通道,这样代码就会循环,直到它从 REST 服务获取所有页面。
Spring 集成是否允许使这样的过滤器在输入通道上接收一条消息,根据条件对其进行测试,并在条件为真时将新消息推送到输出通道?
代码:
//Responses from the REST service go to this channel
@Bean("marketingCategory")
MessageChannel marketingCategory() { return new PublishSubscribeChannel();}
//This channel is used to trigger the outbound gateway which makes a request to the REST service
@Bean
MessageChannel marketingCategoryPoller() {return new DirectChannel();}
//An adapter creating triggering messages for the gateway
@Bean
@InboundChannelAdapter(channel = "marketingCategoryPoller", poller = @Poller(fixedDelay = "15000"))
public MessageSource<String> marketingCategoryPollerMessageSource() { return () -> makeTriggeringMessage(1);}
//A factory for producing messages which trigger the gateway
private Message<String> makeTriggeringMessage(int page) {
//make a message for triggering marketingCategoryOutboundGateway
return MessageBuilder.withPayload("")
.setHeader("Host", "eclinic")
.setHeader("page", page)
.build();
}
//An outbound gateway, makes a request to the REST service and returns the response to marketingCategory channel
@Bean
@ServiceActivator(inputChannel = "marketingCategoryPoller")
public MessageHandler marketingCategoryOutboundGateway(@Qualifier("marketingCategory") MessageChannel channel) {
//make a request to the REST service and push the response to the marketingCategory channel
}
//handler for REST service responses
@Bean
@ServiceActivator(inputChannel = "marketingCategory")
public MessageHandler marketingCategoryHandler() {
return (msg) -> {
//process the categories returned by marketingCategoryOutboundGateway
};
}
我根据这篇帖子找到了解决方案 Read and download from a paginated REST-Services with spring integration:
触发与 REST 服务对话的出站网关,并使用带轮询器的入站通道适配器将响应推送到通道。 入站通道适配器是一个消息源,它最初生成一条消息,其中 header 指示要从 REST API 获取的页码。 出站网关使用页面消息 header 生成一个 url 指定所需的页面
出站网关推送REST服务响应的通道有2个订阅者:
2.1。一个服务激活器,它对获取的数据做一些事情
2.2。一个过滤器,检查这是否是最后一页,如果不是,它将消息进一步发送到 header enricher
使用的另一个通道
收到消息后,header enricher 增加其页面 header 并将消息进一步推送到触发出站网关的通道, 网关读取增加的页面 header 并从 REST 服务中获取下一页
循环不断旋转,直到 REST 服务 returns 最后一页。过滤器不允许此消息传递给 header enricher 以打破循环。
完整代码:
@Configuration
public class IntegrationConfiguration {
private final ApiGateConfig apiGateConfig;
IntegrationConfiguration(ApiGateConfig apiGateConfig) {
this.apiGateConfig = apiGateConfig;
}
@Bean("marketingCategory")
MessageChannel marketingCategory() {
return new PublishSubscribeChannel();
}
@Bean
MessageChannel marketingCategoryPoller() {
return new DirectChannel();
}
@Bean
MessageChannel marketingCategoryPollerNextPage() {
return new DirectChannel();
}
@Bean
@InboundChannelAdapter(channel = "marketingCategoryPoller", poller = @Poller(fixedDelay = "15000"))
public MessageSource<RestPageImpl<MarketingCategory>> marketingCategoryPollerMessageSource() {
return () -> makeTriggeringMessage(0);
}
/**
* Build a gateway triggering message
*/
private Message<RestPageImpl<MarketingCategory>> makeTriggeringMessage(int page) {
return MessageBuilder.withPayload(new RestPageImpl<MarketingCategory>())
.setHeader("Host", "eclinic")
.setHeader("page", page)
.build();
}
@Bean
@ServiceActivator(inputChannel = "marketingCategoryPoller")
public MessageHandler marketingCategoryOutboundGateway(@Qualifier("marketingCategory") MessageChannel channel) {
String uri = apiGateConfig.getUri() + "/marketingCategories?page={page}";
//the type of the payload
ParameterizedTypeReference<RestPageImpl<MarketingCategory>> type = new ParameterizedTypeReference<>() {
};
//page number comes from the message
SpelExpressionParser expressionParser = new SpelExpressionParser();
var uriVariables = new HashMap<String, Expression>();
uriVariables.put("page", expressionParser.parseExpression("headers.page"));
HttpRequestExecutingMessageHandler handler = new HttpRequestExecutingMessageHandler(uri);
handler.setHttpMethod(HttpMethod.GET);
handler.setExpectedResponseTypeExpression(new ValueExpression<>(type));
handler.setOutputChannel(channel);
handler.setUriVariableExpressions(uriVariables);
return handler;
}
@Bean
@ServiceActivator(inputChannel = "marketingCategory")
public MessageHandler marketingCategoryHandler() {
return (msg) -> {
var page = (RestPageImpl<MarketingCategory>) msg.getPayload();
System.out.println("Page #" + page.getNumber());
page.getContent().forEach(c -> System.out.println(c.getMarketingCategory()));
};
}
@Filter(inputChannel = "marketingCategory", outputChannel = "marketingCategoryPollerNextPage")
public boolean marketingCategoryPaginationFilter(RestPageImpl<MarketingCategory> page) {
return !page.isLast();
}
@Bean
@Transformer(inputChannel = "marketingCategoryPollerNextPage", outputChannel = "marketingCategoryPoller")
HeaderEnricher incrementPage() {
Map<String, HeaderValueMessageProcessor<?>> headersToAdd = new HashMap<>();
Expression expression = new SpelExpressionParser().parseExpression("headers.page+1");
var valueProcessor = new ExpressionEvaluatingHeaderValueMessageProcessor<>(expression, Integer.class);
valueProcessor.setOverwrite(true);
headersToAdd.put("page", valueProcessor);
return new HeaderEnricher(headersToAdd);
}
}