处理方法 return

Handle a method return

在我的项目中,我将在 handle 方法中使用 process 方法,并且基于 return 类型,这将是 boolean 我将必须通知建议成功为真或失败为假。

有人可以告诉我如何在集成流程中完成吗?可能还有其他实现吗?

代码:

 @Bean
public IntegrationFlow ftpInboundFlow() {
    return IntegrationFlows
            .from(Ftp.inboundAdapter(ftpSessionFactory())
                            .preserveTimestamp(true)
                            .remoteDirectory(appProperties.getFtp().getRemoteDirectory())
                            .patternFilter(appProperties.getFtp().getFilter())
                            .deleteRemoteFiles(true)
                            .localDirectory(new File("inbound"))
                            .temporaryFileSuffix(appProperties.getFtp().getTemporaryFileSuffix()),
                    e -> e.id("ftpInboundAdapter")
                            .poller(Pollers.fixedDelay(appProperties.getFtp().getPollerDelay()))
                            .autoStartup(true))
            .transform(new FileToByteArrayTransformer())
            .<byte[]>handle((p, h) -> {
                log.info("After transform " + p);
                log.info("Headers " + h);
                CustomerFile customerFile = CustomerFile.builder()
                        .content(p)
                        .customerFileType(CustomerFileType.ORDER_BOOK)
                        .filename(appProperties.ftp.getFileName())
                        .build();
                customerFileDataService.save(customerFile);
                boolean isProssed = orderBookFileProcessor.process(customerFile);
                log.info("is Processed : " + isProssed);
                return isProssed;
            })
            .route(Boolean.class, p -> p.equals(false) ? "historyChannel" : "errorChannel")
            .get();
}

@Bean
public IntegrationFlow history(){
    return IntegrationFlows.from("historyChannel")
            .transform("genericMessage.headers['file_originalFile']")
            .handle(Ftp.outboundAdapter(ftpSessionFactory(), FileExistsMode.REPLACE)
                    .useTemporaryFileName(true)
                    .autoCreateDirectory(true)
                    .remoteDirectory("/ftp/ge/inbound/history"))
                    .get();
}

错误:

Caused by: org.springframework.messaging.MessageHandlingException: Expression evaluation failed: genericMessage.headers['file_originalFile']; nested exception is org.springframework.expression.spel.SpelEvaluationException: EL1008E: Property or field 'genericMessage' cannot be found on object of type 'org.springframework.messaging.support.GenericMessage' - maybe not public or not valid?, failedMessage=GenericMessage [payload=false, headers={file_remoteHostPort=bey-notes-fs.bey.ei:21, file_name=OA_ex_PK_2020_2023.csv, file_remoteDirectory=//ftp/GE/Inbound, file_originalFile=inbound\OA_ex_PK_2020_2023.csv, id=3a6ecf72-e9cd-43e6-bf0c-25020c0ab30d, file_relativePath=OA_ex_PK_2020_2023.csv, file_remoteFile=OA_ex_PK_2020_2023.csv, timestamp=1640030927539}]

我认为您需要查看不同的 handle() 变体:

/**
 * Populate a {@link ServiceActivatingHandler} for the
 * {@link org.springframework.integration.handler.MethodInvokingMessageProcessor}
 * to invoke the provided {@link GenericHandler} at runtime.
 * Typically used with a Java 8 Lambda expression:
 * <pre class="code">
 * {@code
 *  .<Integer>handle((p, h) -> p / 2)
 * }
 * </pre>
 * Use {@link #handle(Class, GenericHandler)} if you need to access the entire
 * message.
 * @param handler the handler to invoke.
 * @param <P> the payload type to expect.
 * @return the current {@link IntegrationFlowDefinition}.
 * @see org.springframework.integration.handler.LambdaMessageProcessor
 */
public <P> B handle(GenericHandler<P> handler) {

您选择的那个带有普通 void handle(Message<?>) 合同。因此,此调用不会产生任何 return。由于您有来自 process() 方法调用的结果并且您想要处理它,因此您需要将流行为从 consumer 更改为 function 此时:

     .<byte[]>handle( (p, h) -> {
                log.info("After transform " + p);
                CustomerFile customerFile = CustomerFile.builder()
                        .content(p)
                        .customerFileType(CustomerFileType.ORDER_BOOK)
                        .build();
                customerFileService.saveCustomerFile(customerFile);
                return orderBookFileProcessor.process(customerFile); 
            })