Spring-Integration: 外部传送名单
Spring-Integration: external routing slip
我想允许来电者传递外部 routing slip,例如通过发布:
POST http://localhost:8080/transform?routing-slip=capitalize&routing-slip=lowercase
Content-Type: text/plain
camelCase
应该可以使用给定的 routing-slip 数组作为来自 pojo 的外部路由表:
@Bean
public IntegrationFlow transformerChain(RoutingSlipRouteStrategy routeStrategy) {
return IntegrationFlows.from(
Http.inboundGateway("/transform")
.headerExpression("routingSlipParam",
"#requestParams['routing-slip']")
.requestPayloadType(String.class))
.enrichHeaders(spec -> spec.header(
IntegrationMessageHeaderAccessor.ROUTING_SLIP,
new RoutingSlipHeaderValueMessageProcessor(
"@routePojo.get(request, reply)")
)
)
.logAndReply();
}
pojo 可以访问 routingSlipParam
header,你会认为它可以将单据保存为内部状态,或者至少 TestRoutingSlipRoutePojo 让我相信,所以我构建了这个(有点怀疑,因为只有一个 pojo 实例):
public class ExternalRoutingSlipRoutePojo {
private List<String> routingSlip;
private int i = 0;
public String get(Message<?> requestMessage, Object reply) {
if (routingSlip == null) {
routingSlip = (LinkedList)requestMessage.getHeaders()
.get("routingSlipParam");
}
try {
return this.routingSlip.get(i++);
} catch (Exception e) {
return null;
}
}
}
事实证明,这只有效一次,毕竟这并不奇怪 - 每条传入消息都会递增索引,并且路由名单永远不会更新。
所以我想,当然,我必须保留所有传入消息的内部状态,并想出了这个 RouteStrategy:
public class ExternalRoutingSlipRouteStrategy implements RoutingSlipRouteStrategy {
private Map<UUID, LinkedList<String>> routingSlips = new WeakHashMap<>();
private static final LinkedList EMPTY_ROUTINGSLIP = new LinkedList<>();
@Override
public Object getNextPath(Message<?> requestMessage,Object reply) {
MessageHeaders headers = requestMessage.getHeaders();
UUID id = headers.getId();
if (!routingSlips.containsKey(id)) {
@SuppressWarnings("unchecked")
List<String> routingSlipParam =
headers.get("routingSlipParam", List.class);
if (routingSlipParam != null) {
routingSlips.put(id,
new LinkedList<>(routingSlipParam));
}
}
LinkedList<String> routingSlip = routingSlips.getOrDefault(id,
EMPTY_ROUTINGSLIP);
String nextPath = routingSlip.poll();
if (nextPath == null) {
routingSlips.remove(id);
}
return nextPath;
}
}
这也行不通,因为不仅会为传入消息调用该策略,还会为动态路由创建的所有新消息调用该策略,这些消息当然具有不同的 ID。
但它只为原始消息调用两次,因此路由单永远不会用完,应用程序将无限循环运行。
如何让 spring-integration 使用外部传送名单?
更新:
正如 Gary Russel 所建议的,外部路由单索引和外部路由单本身都不应该存储在 Spring bean 中,而是可以使用消息 headers 来单独维护它们对于每个请求:
Http.inboundGateway("/transform")
.headerExpression("routingSlipParam",
"#requestParams['routing-slip']")
.requestPayloadType(String.class))
.enrichHeaders(spec -> spec
.headerFunction("counter",h -> new AtomicInteger())
.header(IntegrationMessageHeaderAccessor.ROUTING_SLIP,
new RoutingSlipHeaderValueMessageProcessor(externalRouteStrategy)
)
)
externalRouteStrategy
是以下 class 的实例:
public class ExternalRoutingSlipRouteStrategy implements
RoutingSlipRouteStrategy {
@Override
public Object getNextPath(Message<?> requestMessage, Object reply) {
List<String> routingSlip = (List<String>)
requestMessage.getHeaders().get("routingSlipParam");
int routingSlipIndex = requestMessage.getHeaders()
.get("counter", AtomicInteger.class)
.getAndIncrement();
String routingSlipEntry;
if (routingSlip != null
&& routingSlipIndex < routingSlip.size()) {
routingSlipEntry = routingSlip.get(routingSlipIndex);
} else {
routingSlipEntry = null;
}
return routingSlipEntry;
}
}
作为参考,我已经在 Github 中发布了示例。
返回到您的第一个版本并将 i
存储在消息 header (AtomicInteger
) 中的 header enricher 中。
.headerExpression("counter", "new java.util.concurrent.atomic.AtomicInteger()")
然后
int i = requestMessage.getHeaders().get("counter", AtomicInteger.class).getAndIncrement();
我想允许来电者传递外部 routing slip,例如通过发布:
POST http://localhost:8080/transform?routing-slip=capitalize&routing-slip=lowercase
Content-Type: text/plain
camelCase
应该可以使用给定的 routing-slip 数组作为来自 pojo 的外部路由表:
@Bean
public IntegrationFlow transformerChain(RoutingSlipRouteStrategy routeStrategy) {
return IntegrationFlows.from(
Http.inboundGateway("/transform")
.headerExpression("routingSlipParam",
"#requestParams['routing-slip']")
.requestPayloadType(String.class))
.enrichHeaders(spec -> spec.header(
IntegrationMessageHeaderAccessor.ROUTING_SLIP,
new RoutingSlipHeaderValueMessageProcessor(
"@routePojo.get(request, reply)")
)
)
.logAndReply();
}
pojo 可以访问 routingSlipParam
header,你会认为它可以将单据保存为内部状态,或者至少 TestRoutingSlipRoutePojo 让我相信,所以我构建了这个(有点怀疑,因为只有一个 pojo 实例):
public class ExternalRoutingSlipRoutePojo {
private List<String> routingSlip;
private int i = 0;
public String get(Message<?> requestMessage, Object reply) {
if (routingSlip == null) {
routingSlip = (LinkedList)requestMessage.getHeaders()
.get("routingSlipParam");
}
try {
return this.routingSlip.get(i++);
} catch (Exception e) {
return null;
}
}
}
事实证明,这只有效一次,毕竟这并不奇怪 - 每条传入消息都会递增索引,并且路由名单永远不会更新。
所以我想,当然,我必须保留所有传入消息的内部状态,并想出了这个 RouteStrategy:
public class ExternalRoutingSlipRouteStrategy implements RoutingSlipRouteStrategy {
private Map<UUID, LinkedList<String>> routingSlips = new WeakHashMap<>();
private static final LinkedList EMPTY_ROUTINGSLIP = new LinkedList<>();
@Override
public Object getNextPath(Message<?> requestMessage,Object reply) {
MessageHeaders headers = requestMessage.getHeaders();
UUID id = headers.getId();
if (!routingSlips.containsKey(id)) {
@SuppressWarnings("unchecked")
List<String> routingSlipParam =
headers.get("routingSlipParam", List.class);
if (routingSlipParam != null) {
routingSlips.put(id,
new LinkedList<>(routingSlipParam));
}
}
LinkedList<String> routingSlip = routingSlips.getOrDefault(id,
EMPTY_ROUTINGSLIP);
String nextPath = routingSlip.poll();
if (nextPath == null) {
routingSlips.remove(id);
}
return nextPath;
}
}
这也行不通,因为不仅会为传入消息调用该策略,还会为动态路由创建的所有新消息调用该策略,这些消息当然具有不同的 ID。
但它只为原始消息调用两次,因此路由单永远不会用完,应用程序将无限循环运行。
如何让 spring-integration 使用外部传送名单?
更新:
正如 Gary Russel 所建议的,外部路由单索引和外部路由单本身都不应该存储在 Spring bean 中,而是可以使用消息 headers 来单独维护它们对于每个请求:
Http.inboundGateway("/transform")
.headerExpression("routingSlipParam",
"#requestParams['routing-slip']")
.requestPayloadType(String.class))
.enrichHeaders(spec -> spec
.headerFunction("counter",h -> new AtomicInteger())
.header(IntegrationMessageHeaderAccessor.ROUTING_SLIP,
new RoutingSlipHeaderValueMessageProcessor(externalRouteStrategy)
)
)
externalRouteStrategy
是以下 class 的实例:
public class ExternalRoutingSlipRouteStrategy implements
RoutingSlipRouteStrategy {
@Override
public Object getNextPath(Message<?> requestMessage, Object reply) {
List<String> routingSlip = (List<String>)
requestMessage.getHeaders().get("routingSlipParam");
int routingSlipIndex = requestMessage.getHeaders()
.get("counter", AtomicInteger.class)
.getAndIncrement();
String routingSlipEntry;
if (routingSlip != null
&& routingSlipIndex < routingSlip.size()) {
routingSlipEntry = routingSlip.get(routingSlipIndex);
} else {
routingSlipEntry = null;
}
return routingSlipEntry;
}
}
作为参考,我已经在 Github 中发布了示例。
返回到您的第一个版本并将 i
存储在消息 header (AtomicInteger
) 中的 header enricher 中。
.headerExpression("counter", "new java.util.concurrent.atomic.AtomicInteger()")
然后
int i = requestMessage.getHeaders().get("counter", AtomicInteger.class).getAndIncrement();