Spring 集成 DSL 语法问题 - 如何动态构建子流?

Spring Integration DSL Syntax problem - how to dynamically construct subflows?

我正在尝试在 Spring 集成中构建复杂流程,其中子流程在 运行 时动态定义。在主流程定义中运行良好的代码在子流程定义中无法编译。由于构造 看起来 完全相同,因此不清楚发生了什么。任何解释将不胜感激。

提前致谢。

主流程定义编码如下:

        StandardIntegrationFlow flow = IntegrationFlows
            .from(setupAdapter,
                    c -> c.poller(Pollers.fixedRate(1000L, TimeUnit.MILLISECONDS).maxMessagesPerPoll(1)))

// This one compiles fine
            .enrichHeaders(h -> h.headerExpression("start", "start\")")
                    .headerExpression("end", "payload[0].get(\"end\")"))

            .split(tableSplitter)
            .enrichHeaders(h -> h.headerExpression("object", "payload[0].get(\"object\")"))
            .channel(c -> c.executor(stepTaskExecutor))
            .routeToRecipients(r -> this.buildRecipientListRouterSpecForRules(r, rules))
            .aggregate()
            .handle(cleanupAdapter).get();

buildRecipientListRouterSpecForRules 定义为:

    private RecipientListRouterSpec buildRecipientListRouterSpecForRules(RecipientListRouterSpec recipientListSpec,
        Collection<RuleMetadata> rules) {
    rules.forEach(
            rule -> recipientListSpec.recipientFlow(getFilterExpression(rule), f -> createFlowDefForRule(f, rule)));

    return recipientListSpec;
}

createFlowDefForRule() 只是一个 switch() 包装器,用于为规则定义的流选择哪个实际 DSL 运行。这是一个示例

    public IntegrationFlowDefinition constructASpecificFlowDef(IntegrationFlowDefinition flowDef, RuleMetadata rule) {

    return flowDef
       // This enrichHeaders element fails to compile,
       // The method headerExpression(String, String) is undefined for the type Object
            .enrichHeaders(h -> h.headerExpression("ALC_operation", "payload[0].get(\"ALC_operation\")"));

 }

一般来说,这样的解释最好放在问题文本中,而不是作为代码片段中的注释;我完全错过了那个评论。

您能否提供一个精简(更简单)的示例(完整 class)来展示此行为,以便我们使用它?

我试图简化您正在做的事情,并且编译正常并按预期工作:

@SpringBootApplication
public class So65010958Application {

    public static void main(String[] args) {
        SpringApplication.run(So65010958Application.class, args);
    }

    @Bean
    IntegrationFlow flow() {
        return IntegrationFlows.from("foo")
                .routeToRecipients(r -> r.recipientFlow("true", f -> buildFlow(f)))
                .get();
    }

    private IntegrationFlowDefinition<?> buildFlow(IntegrationFlowDefinition<?> f) {
        return f.enrichHeaders(h -> h.headerExpression("foo", "'bar'"))
                .channel(MessageChannels.queue("bar"));
    }

    @Bean
    public ApplicationRunner runner(MessageChannel foo, PollableChannel bar) {
        return args -> {
            foo.send(new GenericMessage<>("foo"));
            System.out.println(bar.receive(0));
        };
    }

}

GenericMessage [payload=foo, headers={foo=bar, id=d526b8fb-c6f8-7731-b1ad-e68e326fcc00, timestamp=1606333567749}]

所以,我一定是漏掉了什么。