处理方法 return
Handle a method return
在我的项目中,我将在 handle 方法中使用 process 方法,并且基于 return 类型,这将是 boolean 我将必须通知建议成功为真或失败为假。
有人可以告诉我如何在集成流程中完成吗?可能还有其他实现吗?
代码:
@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlows
.from(Ftp.inboundAdapter(ftpSessionFactory())
.preserveTimestamp(true)
.remoteDirectory(appProperties.getFtp().getRemoteDirectory())
.patternFilter(appProperties.getFtp().getFilter())
.deleteRemoteFiles(true)
.localDirectory(new File("inbound"))
.temporaryFileSuffix(appProperties.getFtp().getTemporaryFileSuffix()),
e -> e.id("ftpInboundAdapter")
.poller(Pollers.fixedDelay(appProperties.getFtp().getPollerDelay()))
.autoStartup(true))
.transform(new FileToByteArrayTransformer())
.<byte[]>handle((p, h) -> {
log.info("After transform " + p);
log.info("Headers " + h);
CustomerFile customerFile = CustomerFile.builder()
.content(p)
.customerFileType(CustomerFileType.ORDER_BOOK)
.filename(appProperties.ftp.getFileName())
.build();
customerFileDataService.save(customerFile);
boolean isProssed = orderBookFileProcessor.process(customerFile);
log.info("is Processed : " + isProssed);
return isProssed;
})
.route(Boolean.class, p -> p.equals(false) ? "historyChannel" : "errorChannel")
.get();
}
@Bean
public IntegrationFlow history(){
return IntegrationFlows.from("historyChannel")
.transform("genericMessage.headers['file_originalFile']")
.handle(Ftp.outboundAdapter(ftpSessionFactory(), FileExistsMode.REPLACE)
.useTemporaryFileName(true)
.autoCreateDirectory(true)
.remoteDirectory("/ftp/ge/inbound/history"))
.get();
}
错误:
Caused by: org.springframework.messaging.MessageHandlingException: Expression evaluation failed: genericMessage.headers['file_originalFile']; nested exception is org.springframework.expression.spel.SpelEvaluationException: EL1008E: Property or field 'genericMessage' cannot be found on object of type 'org.springframework.messaging.support.GenericMessage' - maybe not public or not valid?, failedMessage=GenericMessage [payload=false, headers={file_remoteHostPort=bey-notes-fs.bey.ei:21, file_name=OA_ex_PK_2020_2023.csv, file_remoteDirectory=//ftp/GE/Inbound, file_originalFile=inbound\OA_ex_PK_2020_2023.csv, id=3a6ecf72-e9cd-43e6-bf0c-25020c0ab30d, file_relativePath=OA_ex_PK_2020_2023.csv, file_remoteFile=OA_ex_PK_2020_2023.csv, timestamp=1640030927539}]
我认为您需要查看不同的 handle()
变体:
/**
* Populate a {@link ServiceActivatingHandler} for the
* {@link org.springframework.integration.handler.MethodInvokingMessageProcessor}
* to invoke the provided {@link GenericHandler} at runtime.
* Typically used with a Java 8 Lambda expression:
* <pre class="code">
* {@code
* .<Integer>handle((p, h) -> p / 2)
* }
* </pre>
* Use {@link #handle(Class, GenericHandler)} if you need to access the entire
* message.
* @param handler the handler to invoke.
* @param <P> the payload type to expect.
* @return the current {@link IntegrationFlowDefinition}.
* @see org.springframework.integration.handler.LambdaMessageProcessor
*/
public <P> B handle(GenericHandler<P> handler) {
您选择的那个带有普通 void handle(Message<?>)
合同。因此,此调用不会产生任何 return。由于您有来自 process()
方法调用的结果并且您想要处理它,因此您需要将流行为从 consumer 更改为 function 此时:
.<byte[]>handle( (p, h) -> {
log.info("After transform " + p);
CustomerFile customerFile = CustomerFile.builder()
.content(p)
.customerFileType(CustomerFileType.ORDER_BOOK)
.build();
customerFileService.saveCustomerFile(customerFile);
return orderBookFileProcessor.process(customerFile);
})
在我的项目中,我将在 handle 方法中使用 process 方法,并且基于 return 类型,这将是 boolean 我将必须通知建议成功为真或失败为假。
有人可以告诉我如何在集成流程中完成吗?可能还有其他实现吗?
代码:
@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlows
.from(Ftp.inboundAdapter(ftpSessionFactory())
.preserveTimestamp(true)
.remoteDirectory(appProperties.getFtp().getRemoteDirectory())
.patternFilter(appProperties.getFtp().getFilter())
.deleteRemoteFiles(true)
.localDirectory(new File("inbound"))
.temporaryFileSuffix(appProperties.getFtp().getTemporaryFileSuffix()),
e -> e.id("ftpInboundAdapter")
.poller(Pollers.fixedDelay(appProperties.getFtp().getPollerDelay()))
.autoStartup(true))
.transform(new FileToByteArrayTransformer())
.<byte[]>handle((p, h) -> {
log.info("After transform " + p);
log.info("Headers " + h);
CustomerFile customerFile = CustomerFile.builder()
.content(p)
.customerFileType(CustomerFileType.ORDER_BOOK)
.filename(appProperties.ftp.getFileName())
.build();
customerFileDataService.save(customerFile);
boolean isProssed = orderBookFileProcessor.process(customerFile);
log.info("is Processed : " + isProssed);
return isProssed;
})
.route(Boolean.class, p -> p.equals(false) ? "historyChannel" : "errorChannel")
.get();
}
@Bean
public IntegrationFlow history(){
return IntegrationFlows.from("historyChannel")
.transform("genericMessage.headers['file_originalFile']")
.handle(Ftp.outboundAdapter(ftpSessionFactory(), FileExistsMode.REPLACE)
.useTemporaryFileName(true)
.autoCreateDirectory(true)
.remoteDirectory("/ftp/ge/inbound/history"))
.get();
}
错误:
Caused by: org.springframework.messaging.MessageHandlingException: Expression evaluation failed: genericMessage.headers['file_originalFile']; nested exception is org.springframework.expression.spel.SpelEvaluationException: EL1008E: Property or field 'genericMessage' cannot be found on object of type 'org.springframework.messaging.support.GenericMessage' - maybe not public or not valid?, failedMessage=GenericMessage [payload=false, headers={file_remoteHostPort=bey-notes-fs.bey.ei:21, file_name=OA_ex_PK_2020_2023.csv, file_remoteDirectory=//ftp/GE/Inbound, file_originalFile=inbound\OA_ex_PK_2020_2023.csv, id=3a6ecf72-e9cd-43e6-bf0c-25020c0ab30d, file_relativePath=OA_ex_PK_2020_2023.csv, file_remoteFile=OA_ex_PK_2020_2023.csv, timestamp=1640030927539}]
我认为您需要查看不同的 handle()
变体:
/**
* Populate a {@link ServiceActivatingHandler} for the
* {@link org.springframework.integration.handler.MethodInvokingMessageProcessor}
* to invoke the provided {@link GenericHandler} at runtime.
* Typically used with a Java 8 Lambda expression:
* <pre class="code">
* {@code
* .<Integer>handle((p, h) -> p / 2)
* }
* </pre>
* Use {@link #handle(Class, GenericHandler)} if you need to access the entire
* message.
* @param handler the handler to invoke.
* @param <P> the payload type to expect.
* @return the current {@link IntegrationFlowDefinition}.
* @see org.springframework.integration.handler.LambdaMessageProcessor
*/
public <P> B handle(GenericHandler<P> handler) {
您选择的那个带有普通 void handle(Message<?>)
合同。因此,此调用不会产生任何 return。由于您有来自 process()
方法调用的结果并且您想要处理它,因此您需要将流行为从 consumer 更改为 function 此时:
.<byte[]>handle( (p, h) -> {
log.info("After transform " + p);
CustomerFile customerFile = CustomerFile.builder()
.content(p)
.customerFileType(CustomerFileType.ORDER_BOOK)
.build();
customerFileService.saveCustomerFile(customerFile);
return orderBookFileProcessor.process(customerFile);
})