Spring 集成 DSL 语法问题 - 如何动态构建子流?
Spring Integration DSL Syntax problem - how to dynamically construct subflows?
我正在尝试在 Spring 集成中构建复杂流程,其中子流程在 运行 时动态定义。在主流程定义中运行良好的代码在子流程定义中无法编译。由于构造 看起来 完全相同,因此不清楚发生了什么。任何解释将不胜感激。
提前致谢。
主流程定义编码如下:
StandardIntegrationFlow flow = IntegrationFlows
.from(setupAdapter,
c -> c.poller(Pollers.fixedRate(1000L, TimeUnit.MILLISECONDS).maxMessagesPerPoll(1)))
// This one compiles fine
.enrichHeaders(h -> h.headerExpression("start", "start\")")
.headerExpression("end", "payload[0].get(\"end\")"))
.split(tableSplitter)
.enrichHeaders(h -> h.headerExpression("object", "payload[0].get(\"object\")"))
.channel(c -> c.executor(stepTaskExecutor))
.routeToRecipients(r -> this.buildRecipientListRouterSpecForRules(r, rules))
.aggregate()
.handle(cleanupAdapter).get();
buildRecipientListRouterSpecForRules 定义为:
private RecipientListRouterSpec buildRecipientListRouterSpecForRules(RecipientListRouterSpec recipientListSpec,
Collection<RuleMetadata> rules) {
rules.forEach(
rule -> recipientListSpec.recipientFlow(getFilterExpression(rule), f -> createFlowDefForRule(f, rule)));
return recipientListSpec;
}
createFlowDefForRule() 只是一个 switch()
包装器,用于为规则定义的流选择哪个实际 DSL 运行。这是一个示例
public IntegrationFlowDefinition constructASpecificFlowDef(IntegrationFlowDefinition flowDef, RuleMetadata rule) {
return flowDef
// This enrichHeaders element fails to compile,
// The method headerExpression(String, String) is undefined for the type Object
.enrichHeaders(h -> h.headerExpression("ALC_operation", "payload[0].get(\"ALC_operation\")"));
}
一般来说,这样的解释最好放在问题文本中,而不是作为代码片段中的注释;我完全错过了那个评论。
您能否提供一个精简(更简单)的示例(完整 class)来展示此行为,以便我们使用它?
我试图简化您正在做的事情,并且编译正常并按预期工作:
@SpringBootApplication
public class So65010958Application {
public static void main(String[] args) {
SpringApplication.run(So65010958Application.class, args);
}
@Bean
IntegrationFlow flow() {
return IntegrationFlows.from("foo")
.routeToRecipients(r -> r.recipientFlow("true", f -> buildFlow(f)))
.get();
}
private IntegrationFlowDefinition<?> buildFlow(IntegrationFlowDefinition<?> f) {
return f.enrichHeaders(h -> h.headerExpression("foo", "'bar'"))
.channel(MessageChannels.queue("bar"));
}
@Bean
public ApplicationRunner runner(MessageChannel foo, PollableChannel bar) {
return args -> {
foo.send(new GenericMessage<>("foo"));
System.out.println(bar.receive(0));
};
}
}
GenericMessage [payload=foo, headers={foo=bar, id=d526b8fb-c6f8-7731-b1ad-e68e326fcc00, timestamp=1606333567749}]
所以,我一定是漏掉了什么。
我正在尝试在 Spring 集成中构建复杂流程,其中子流程在 运行 时动态定义。在主流程定义中运行良好的代码在子流程定义中无法编译。由于构造 看起来 完全相同,因此不清楚发生了什么。任何解释将不胜感激。
提前致谢。
主流程定义编码如下:
StandardIntegrationFlow flow = IntegrationFlows
.from(setupAdapter,
c -> c.poller(Pollers.fixedRate(1000L, TimeUnit.MILLISECONDS).maxMessagesPerPoll(1)))
// This one compiles fine
.enrichHeaders(h -> h.headerExpression("start", "start\")")
.headerExpression("end", "payload[0].get(\"end\")"))
.split(tableSplitter)
.enrichHeaders(h -> h.headerExpression("object", "payload[0].get(\"object\")"))
.channel(c -> c.executor(stepTaskExecutor))
.routeToRecipients(r -> this.buildRecipientListRouterSpecForRules(r, rules))
.aggregate()
.handle(cleanupAdapter).get();
buildRecipientListRouterSpecForRules 定义为:
private RecipientListRouterSpec buildRecipientListRouterSpecForRules(RecipientListRouterSpec recipientListSpec,
Collection<RuleMetadata> rules) {
rules.forEach(
rule -> recipientListSpec.recipientFlow(getFilterExpression(rule), f -> createFlowDefForRule(f, rule)));
return recipientListSpec;
}
createFlowDefForRule() 只是一个 switch()
包装器,用于为规则定义的流选择哪个实际 DSL 运行。这是一个示例
public IntegrationFlowDefinition constructASpecificFlowDef(IntegrationFlowDefinition flowDef, RuleMetadata rule) {
return flowDef
// This enrichHeaders element fails to compile,
// The method headerExpression(String, String) is undefined for the type Object
.enrichHeaders(h -> h.headerExpression("ALC_operation", "payload[0].get(\"ALC_operation\")"));
}
一般来说,这样的解释最好放在问题文本中,而不是作为代码片段中的注释;我完全错过了那个评论。
您能否提供一个精简(更简单)的示例(完整 class)来展示此行为,以便我们使用它?
我试图简化您正在做的事情,并且编译正常并按预期工作:
@SpringBootApplication
public class So65010958Application {
public static void main(String[] args) {
SpringApplication.run(So65010958Application.class, args);
}
@Bean
IntegrationFlow flow() {
return IntegrationFlows.from("foo")
.routeToRecipients(r -> r.recipientFlow("true", f -> buildFlow(f)))
.get();
}
private IntegrationFlowDefinition<?> buildFlow(IntegrationFlowDefinition<?> f) {
return f.enrichHeaders(h -> h.headerExpression("foo", "'bar'"))
.channel(MessageChannels.queue("bar"));
}
@Bean
public ApplicationRunner runner(MessageChannel foo, PollableChannel bar) {
return args -> {
foo.send(new GenericMessage<>("foo"));
System.out.println(bar.receive(0));
};
}
}
GenericMessage [payload=foo, headers={foo=bar, id=d526b8fb-c6f8-7731-b1ad-e68e326fcc00, timestamp=1606333567749}]
所以,我一定是漏掉了什么。