什么是建议失败转换的正确方法 Spring 集成 Java DSL

What is proper way to advise failed transform w/ Spring Integration Java DSL

我已经完成了一个"happy-path"(如下)。

我如何建议 .transform 调用以调用错误流(通过 errorChannel)w/o 中断 mainFlow

目前 mainFlow 在第二次 .transform 中出现第一次失败时终止(当有效负载无法反序列化为类型时)。我想要的行为是我想记录并继续处理。

我读过 ExpressionEvaluatingRequestHandlerAdvice。我是否会像 e -> e.advice(myAdviceBean) 一样向每个 .transform 调用添加第二个参数并使用 successerror 通道声明这样的 bean?假设我需要分解我的 mainFlow 才能从每个转换中获得成功。

在一些评论的方向上,我更新了原始代码示例。但我仍然无法接受这个 "all the way home"。

2015-09-08 11:49:19,664 [pool-3-thread-1] org.springframework.integration.handler.ServiceActivatingHandler DEBUG handler 'ServiceActivator for [org.springframework.integration.dsl.support.BeanNameMessageProcessor@5f3839ad] (org.springframework.integration.handler.ServiceActivatingHandler#0)' produced no reply for request Message: ErrorMessage [payload=org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice$MessageHandlingExpressionEvaluatingAdviceException: Handler Failed; nested exception is org.springframework.integration.transformer.MessageTransformationException: failed to transform message; nested exception is com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "hasDoaCostPriceChanged" (class com.xxx.changehistory.jdbc.data.RatePlanLevelRestrictionLog), not marked as ignorable (18 known properties: "supplierUpdateDate", "fPLOSMaskArrival", "createDate", "endAllowed", "sellStateId", "ratePlanLevel", "ratePlanId", "startAllowed", "stayDate", "doaCostPriceChanged", "hotelId", "logActionTypeId" [truncated]])
 at [Source: java.util.zip.GZIPInputStream@242017b8; line: 1, column: 32] (through reference chain: com.xxx.changehistory.jdbc.data.RatePlanLevelRestrictionLog["hasDoaCostPriceChanged"]), headers={id=c054d976-5750-827f-8894-51aba9655c77, timestamp=1441738159660}]
2015-09-08 11:49:19,664 [pool-3-thread-1] org.springframework.integration.channel.DirectChannel DEBUG postSend (sent=true) on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice$MessageHandlingExpressionEvaluatingAdviceException: Handler Failed; nested exception is org.springframework.integration.transformer.MessageTransformationException: failed to transform message; nested exception is com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "hasDoaCostPriceChanged" (class com.xxx.changehistory.jdbc.data.RatePlanLevelRestrictionLog), not marked as ignorable (18 known properties: "supplierUpdateDate", "fPLOSMaskArrival", "createDate", "endAllowed", "sellStateId", "ratePlanLevel", "ratePlanId", "startAllowed", "stayDate", "doaCostPriceChanged", "hotelId", "logActionTypeId" [truncated]])
 at [Source: java.util.zip.GZIPInputStream@242017b8; line: 1, column: 32] (through reference chain: com.xxx.changehistory.jdbc.data.RatePlanLevelRestrictionLog["hasDoaCostPriceChanged"]), headers={id=c054d976-5750-827f-8894-51aba9655c77, timestamp=1441738159660}]
2015-09-08 11:49:19,664 [pool-3-thread-1] org.springframework.integration.channel.DirectChannel DEBUG preSend on channel 'mainFlow.channel#3', message: GenericMessage [payload=java.util.zip.GZIPInputStream@242017b8, headers={id=b80106f9-7f4c-1b92-6aca-6e73d3bf8792, timestamp=1441738159664}]
2015-09-08 11:49:19,664 [pool-3-thread-1] org.springframework.integration.aggregator.AggregatingMessageHandler DEBUG org.springframework.integration.aggregator.AggregatingMessageHandler#0 received message: GenericMessage [payload=java.util.zip.GZIPInputStream@242017b8, headers={id=b80106f9-7f4c-1b92-6aca-6e73d3bf8792, timestamp=1441738159664}]
2015-09-08 11:49:19,665 [pool-3-thread-1] org.springframework.integration.channel.DirectChannel DEBUG preSend on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.aggregator.AggregatingMessageHandler#0]; nested exception is java.lang.IllegalStateException: Null correlation not allowed.  Maybe the CorrelationStrategy is failing?, headers={id=24e3a1c7-af6b-032c-6a29-b55031fba0d7, timestamp=1441738159665}]
2015-09-08 11:49:19,665 [pool-3-thread-1] org.springframework.integration.handler.ServiceActivatingHandler DEBUG ServiceActivator for [org.springframework.integration.dsl.support.BeanNameMessageProcessor@5f3839ad] (org.springframework.integration.handler.ServiceActivatingHandler#0) received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.aggregator.AggregatingMessageHandler#0]; nested exception is java.lang.IllegalStateException: Null correlation not allowed.  Maybe the CorrelationStrategy is failing?, headers={id=24e3a1c7-af6b-032c-6a29-b55031fba0d7, timestamp=1441738159665}]
2015-09-08 11:49:19,665 [pool-3-thread-1] com.xxx.DataMigrationModule$ErrorService ERROR org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.aggregator.AggregatingMessageHandler#0]; nested exception is java.lang.IllegalStateException: Null correlation not allowed.  Maybe the CorrelationStrategy is failing?
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:287)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:245)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:95)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.access[=23=]0(UnicastingDispatcher.java:48)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.run(UnicastingDispatcher.java:92)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.run(ErrorHandlingTaskExecutor.java:52)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: Null correlation not allowed.  Maybe the CorrelationStrategy is failing?
    at org.springframework.util.Assert.state(Assert.java:385)
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:369)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
    ... 22 more

更新 (09-08-2015)

代码示例

@Bean
public IntegrationFlow mainFlow() {
    // @formatter:off
    return IntegrationFlows
            .from(
                    amazonS3InboundSynchronizationMessageSource(),
                    e -> e.poller(p -> p.trigger(this::nextExecutionTime))
            )
            .transform(unzipTransformer())
            .split(f -> new FileSplitter())
            .channel(MessageChannels.executor(Executors.newCachedThreadPool()))
            .transform(Transformers.fromJson(persistentType()), , e -> e.advice(handlingAdvice()))
            // @see http://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#agg-and-group-to
            .aggregate(a -> 
                            a.releaseStrategy(g -> g.size() == persistenceBatchSize)
                            .expireGroupsUponCompletion(true)
                            .sendPartialResultOnExpiry(true)
                            .groupTimeoutExpression("size() ge 2 ? 10000 : -1")
                            , null
            )
            .handle(jdbcRepositoryHandler())
            // TODO add advised PollableChannel to deal with possible persistence issue and retry with partial batch
            .get();
    // @formatter:on
}

@Bean
public ErrorService errorService() {
    return new ErrorService();
}

@Bean
public MessageChannel customErrorChannel() {
    return MessageChannels.direct().get();
}

@Bean
public IntegrationFlow customErrorFlow() {
    // @formatter:off
    return IntegrationFlows
            .from(customErrorChannel())
            .handle("errorService", "handleError")
            .get();
    // @formatter:on
}

@Bean
ExpressionEvaluatingRequestHandlerAdvice handlingAdvice() {
    ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
    advice.setOnFailureExpression("payload");
    advice.setFailureChannel(customErrorChannel());
    advice.setReturnFailureExpressionResult(true);
    advice.setTrapException(true);
    return advice;
}

protected class ErrorService implements ErrorHandler {

    private final Logger log = LoggerFactory.getLogger(getClass());

    @Override
    public void handleError(Throwable t) {
        stopEndpoints(t);
    }

    private void stopEndpoints(Throwable t) {
        log.error(ExceptionUtils.getStackTrace(t));
    }

}

是的,你是对的。对于 advice Transformer MessageHandlerhandle() 方法,您应该使用 .transform() EIP 方法的第二个参数的 e.advice 方法。是的:您应该根据您的目的定义 ExpressionEvaluatingRequestHandlerAdvice bean。

您可以为不同的目标重用该 Advice bean,以相同的方式处理成功和失败。

更新

虽然我不清楚你想如何继续错误消息的流程,但你可以使用 ExpressionEvaluatingRequestHandlerAdviceonFailureExpressionreturnFailureExpressionResult=true return unzipErrorChannel().

之后的内容

顺便说一句,如果没有 onFailureExpressionfailureChannel 逻辑就无法工作:

if (this.onFailureExpression != null) {
    Object evalResult = this.evaluateFailureExpression(message, actualException);
    if (this.returnFailureExpressionResult) {
        return evalResult;
    }
}

原来我在几个地方出了问题,比如:

  • 我必须自动装配一个 Jackson2 ObjectMapper(我从 Sprint Boot 自动配置中获得)并构造一个 JsonObjectMapper 的实例作为第二个参数添加到 Transformers.fromJson;更宽松地解组为持久类型(停止 UnrecognizedPropertyException);从而免除了 ExpressionEvaluatingRequestHandlerAdvice

  • 的需要
  • IntegrationFlowDefinition 中选择 .split 方法的正确变体以使用 FileSplitter,否则你不会得到这个分离器而是 DefaultMessageSplitter 在从 InputStream

  • 读取第一条记录后提前终止流程
  • 已将 transformaggregatehandle 移动到使用异步任务执行程序的自己的 pubsub 频道

仍然不是我所需要的 100%,但还远远不够。

看看我最终得到了什么...

@Configuration
@EnableIntegration
@IntegrationComponentScan
public class DataMigrationModule {

private final Logger log = LoggerFactory.getLogger(getClass());

@Value("${cloud.aws.credentials.accessKey}")
private String accessKey;

@Value("${cloud.aws.credentials.secretKey}")
private String secretKey;

@Value("${cloud.aws.s3.bucket}")
private String bucket;

@Value("${cloud.aws.s3.max-objects-per-batch:1024}")
private int maxObjectsPerBatch;

@Value("${cloud.aws.s3.accept-subfolders:false}")
private String acceptSubFolders;

@Value("${cloud.aws.s3.remote-directory}")
private String remoteDirectory;

@Value("${cloud.aws.s3.local-directory-ref:java.io.tmpdir}")
private String localDirectoryRef;

@Value("${cloud.aws.s3.local-subdirectory:target/s3-dump}")
private String localSubdirectory;

@Value("${cloud.aws.s3.filename-wildcard:}")
private String fileNameWildcard;

@Value("${app.persistent-type:}")
private String persistentType;

@Value("${app.repository-type:}")
private String repositoryType;

@Value("${app.persistence-batch-size:2500}")
private int persistenceBatchSize;

@Value("${app.persistence-batch-release-timeout-in-milliseconds:5000}")
private int persistenceBatchReleaseTimeoutMillis;

@Autowired
private ListableBeanFactory beanFactory;

@Autowired
private ObjectMapper objectMapper;

private final AtomicBoolean invoked = new AtomicBoolean();

private Class<?> repositoryType() {
    try {
        return Class.forName(repositoryType);
    } catch (ClassNotFoundException cnfe) {
        log.error("Unknown repository implementation!", cnfe);
        System.exit(0);
    }
    return null;
}

private Class<?> persistentType() {
    try {
        return Class.forName(persistentType);
    } catch (ClassNotFoundException cnfe) {
        log.error("Unsupported type!", cnfe);
        System.exit(0);
    }
    return null;
}

public Date nextExecutionTime(TriggerContext triggerContext) {
    return this.invoked.getAndSet(true) ? null : new Date();
}

@Bean
public FileToInputStreamTransformer unzipTransformer() {
    FileToInputStreamTransformer transformer = new FileToInputStreamTransformer();
    transformer.setDeleteFiles(true);
    return transformer;
}

@Bean
public MessageSource<?> amazonS3InboundSynchronizationMessageSource() {
    AWSCredentials credentials = new BasicAWSCredentials(this.accessKey, this.secretKey);
    AmazonS3InboundSynchronizationMessageSource messageSource = new AmazonS3InboundSynchronizationMessageSource();
    messageSource.setCredentials(credentials);
    messageSource.setBucket(bucket);
    messageSource.setMaxObjectsPerBatch(maxObjectsPerBatch);
    messageSource.setAcceptSubFolders(Boolean.valueOf(acceptSubFolders));
    messageSource.setRemoteDirectory(remoteDirectory);
    if (!fileNameWildcard.isEmpty()) {
        messageSource.setFileNameWildcard(fileNameWildcard);
    }
    String directory = System.getProperty(localDirectoryRef);
    if (!localSubdirectory.startsWith("/")) {
        localSubdirectory = "/" + localSubdirectory;
    }
    if (!localSubdirectory.endsWith("/")) {
        localSubdirectory = localSubdirectory + "/";
    }
    directory = directory + localSubdirectory;
    FileUtils.mkdir(directory);
    messageSource.setDirectory(new LiteralExpression(directory));
    return messageSource;
}

@Bean
public IntegrationFlow mainFlow() {
    // @formatter:off
    return IntegrationFlows
            .from(
                    amazonS3InboundSynchronizationMessageSource(),
                    e -> e.poller(p -> p.trigger(this::nextExecutionTime))
            )
            .transform(unzipTransformer())
            .split(new FileSplitter(), null)
            .publishSubscribeChannel(new SimpleAsyncTaskExecutor(), p -> p.subscribe(persistenceSubFlow()))
            .get();
    // @formatter:on
}

@Bean
public IntegrationFlow persistenceSubFlow() {
    JsonObjectMapper<?, ?> jsonObjectMapper = new Jackson2JsonObjectMapper(objectMapper);
    ReleaseStrategy releaseStrategy = new TimeoutCountSequenceSizeReleaseStrategy(persistenceBatchSize,
            persistenceBatchReleaseTimeoutMillis);
    // @formatter:off
    return f -> f
            .transform(Transformers.fromJson(persistentType(), jsonObjectMapper))
            // @see http://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#agg-and-group-to
            .aggregate(
                    a -> a
                        .releaseStrategy(releaseStrategy)
                        .correlationStrategy(m -> m.getHeaders().get("id"))
                        .expireGroupsUponCompletion(true)
                        .sendPartialResultOnExpiry(true)
                        , null
            )
            .handle(jdbcRepositoryHandler());
    // @formatter:on
}

@Bean
public JdbcRepositoryHandler jdbcRepositoryHandler() {
    return new JdbcRepositoryHandler(repositoryType(), beanFactory);
}

protected class JdbcRepositoryHandler extends AbstractMessageHandler {

    @SuppressWarnings("rawtypes")
    private Insertable repository;

    public JdbcRepositoryHandler(Class<?> repositoryClass, ListableBeanFactory beanFactory) {
        repository = (Insertable<?>) beanFactory.getBean(repositoryClass);
    }

    @Override
    protected void handleMessageInternal(Message<?> message) {
        repository.insert((List<?>) message.getPayload());
    }

}

protected class FileToInputStreamTransformer extends AbstractFilePayloadTransformer<InputStream> {

    @Override
    protected InputStream transformFile(File payload) throws Exception {
        return new GZIPInputStream(new FileInputStream(payload));
    }
}

}