什么是建议失败转换的正确方法 Spring 集成 Java DSL
What is proper way to advise failed transform w/ Spring Integration Java DSL
我已经完成了一个"happy-path"(如下)。
我如何建议 .transform
调用以调用错误流(通过 errorChannel)w/o 中断 mainFlow
?
目前 mainFlow
在第二次 .transform
中出现第一次失败时终止(当有效负载无法反序列化为类型时)。我想要的行为是我想记录并继续处理。
我读过 ExpressionEvaluatingRequestHandlerAdvice
。我是否会像 e -> e.advice(myAdviceBean)
一样向每个 .transform
调用添加第二个参数并使用 success
和 error
通道声明这样的 bean?假设我需要分解我的 mainFlow
才能从每个转换中获得成功。
在一些评论的方向上,我更新了原始代码示例。但我仍然无法接受这个 "all the way home"。
2015-09-08 11:49:19,664 [pool-3-thread-1] org.springframework.integration.handler.ServiceActivatingHandler DEBUG handler 'ServiceActivator for [org.springframework.integration.dsl.support.BeanNameMessageProcessor@5f3839ad] (org.springframework.integration.handler.ServiceActivatingHandler#0)' produced no reply for request Message: ErrorMessage [payload=org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice$MessageHandlingExpressionEvaluatingAdviceException: Handler Failed; nested exception is org.springframework.integration.transformer.MessageTransformationException: failed to transform message; nested exception is com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "hasDoaCostPriceChanged" (class com.xxx.changehistory.jdbc.data.RatePlanLevelRestrictionLog), not marked as ignorable (18 known properties: "supplierUpdateDate", "fPLOSMaskArrival", "createDate", "endAllowed", "sellStateId", "ratePlanLevel", "ratePlanId", "startAllowed", "stayDate", "doaCostPriceChanged", "hotelId", "logActionTypeId" [truncated]])
at [Source: java.util.zip.GZIPInputStream@242017b8; line: 1, column: 32] (through reference chain: com.xxx.changehistory.jdbc.data.RatePlanLevelRestrictionLog["hasDoaCostPriceChanged"]), headers={id=c054d976-5750-827f-8894-51aba9655c77, timestamp=1441738159660}]
2015-09-08 11:49:19,664 [pool-3-thread-1] org.springframework.integration.channel.DirectChannel DEBUG postSend (sent=true) on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice$MessageHandlingExpressionEvaluatingAdviceException: Handler Failed; nested exception is org.springframework.integration.transformer.MessageTransformationException: failed to transform message; nested exception is com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "hasDoaCostPriceChanged" (class com.xxx.changehistory.jdbc.data.RatePlanLevelRestrictionLog), not marked as ignorable (18 known properties: "supplierUpdateDate", "fPLOSMaskArrival", "createDate", "endAllowed", "sellStateId", "ratePlanLevel", "ratePlanId", "startAllowed", "stayDate", "doaCostPriceChanged", "hotelId", "logActionTypeId" [truncated]])
at [Source: java.util.zip.GZIPInputStream@242017b8; line: 1, column: 32] (through reference chain: com.xxx.changehistory.jdbc.data.RatePlanLevelRestrictionLog["hasDoaCostPriceChanged"]), headers={id=c054d976-5750-827f-8894-51aba9655c77, timestamp=1441738159660}]
2015-09-08 11:49:19,664 [pool-3-thread-1] org.springframework.integration.channel.DirectChannel DEBUG preSend on channel 'mainFlow.channel#3', message: GenericMessage [payload=java.util.zip.GZIPInputStream@242017b8, headers={id=b80106f9-7f4c-1b92-6aca-6e73d3bf8792, timestamp=1441738159664}]
2015-09-08 11:49:19,664 [pool-3-thread-1] org.springframework.integration.aggregator.AggregatingMessageHandler DEBUG org.springframework.integration.aggregator.AggregatingMessageHandler#0 received message: GenericMessage [payload=java.util.zip.GZIPInputStream@242017b8, headers={id=b80106f9-7f4c-1b92-6aca-6e73d3bf8792, timestamp=1441738159664}]
2015-09-08 11:49:19,665 [pool-3-thread-1] org.springframework.integration.channel.DirectChannel DEBUG preSend on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.aggregator.AggregatingMessageHandler#0]; nested exception is java.lang.IllegalStateException: Null correlation not allowed. Maybe the CorrelationStrategy is failing?, headers={id=24e3a1c7-af6b-032c-6a29-b55031fba0d7, timestamp=1441738159665}]
2015-09-08 11:49:19,665 [pool-3-thread-1] org.springframework.integration.handler.ServiceActivatingHandler DEBUG ServiceActivator for [org.springframework.integration.dsl.support.BeanNameMessageProcessor@5f3839ad] (org.springframework.integration.handler.ServiceActivatingHandler#0) received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.aggregator.AggregatingMessageHandler#0]; nested exception is java.lang.IllegalStateException: Null correlation not allowed. Maybe the CorrelationStrategy is failing?, headers={id=24e3a1c7-af6b-032c-6a29-b55031fba0d7, timestamp=1441738159665}]
2015-09-08 11:49:19,665 [pool-3-thread-1] com.xxx.DataMigrationModule$ErrorService ERROR org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.aggregator.AggregatingMessageHandler#0]; nested exception is java.lang.IllegalStateException: Null correlation not allowed. Maybe the CorrelationStrategy is failing?
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:287)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:245)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:95)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101)
at org.springframework.integration.dispatcher.UnicastingDispatcher.access[=23=]0(UnicastingDispatcher.java:48)
at org.springframework.integration.dispatcher.UnicastingDispatcher.run(UnicastingDispatcher.java:92)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.run(ErrorHandlingTaskExecutor.java:52)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: Null correlation not allowed. Maybe the CorrelationStrategy is failing?
at org.springframework.util.Assert.state(Assert.java:385)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:369)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
... 22 more
更新 (09-08-2015)
代码示例
@Bean
public IntegrationFlow mainFlow() {
// @formatter:off
return IntegrationFlows
.from(
amazonS3InboundSynchronizationMessageSource(),
e -> e.poller(p -> p.trigger(this::nextExecutionTime))
)
.transform(unzipTransformer())
.split(f -> new FileSplitter())
.channel(MessageChannels.executor(Executors.newCachedThreadPool()))
.transform(Transformers.fromJson(persistentType()), , e -> e.advice(handlingAdvice()))
// @see http://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#agg-and-group-to
.aggregate(a ->
a.releaseStrategy(g -> g.size() == persistenceBatchSize)
.expireGroupsUponCompletion(true)
.sendPartialResultOnExpiry(true)
.groupTimeoutExpression("size() ge 2 ? 10000 : -1")
, null
)
.handle(jdbcRepositoryHandler())
// TODO add advised PollableChannel to deal with possible persistence issue and retry with partial batch
.get();
// @formatter:on
}
@Bean
public ErrorService errorService() {
return new ErrorService();
}
@Bean
public MessageChannel customErrorChannel() {
return MessageChannels.direct().get();
}
@Bean
public IntegrationFlow customErrorFlow() {
// @formatter:off
return IntegrationFlows
.from(customErrorChannel())
.handle("errorService", "handleError")
.get();
// @formatter:on
}
@Bean
ExpressionEvaluatingRequestHandlerAdvice handlingAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setOnFailureExpression("payload");
advice.setFailureChannel(customErrorChannel());
advice.setReturnFailureExpressionResult(true);
advice.setTrapException(true);
return advice;
}
protected class ErrorService implements ErrorHandler {
private final Logger log = LoggerFactory.getLogger(getClass());
@Override
public void handleError(Throwable t) {
stopEndpoints(t);
}
private void stopEndpoints(Throwable t) {
log.error(ExceptionUtils.getStackTrace(t));
}
}
是的,你是对的。对于 advice
Transformer MessageHandler
的 handle()
方法,您应该使用 .transform()
EIP 方法的第二个参数的 e.advice
方法。是的:您应该根据您的目的定义 ExpressionEvaluatingRequestHandlerAdvice
bean。
您可以为不同的目标重用该 Advice
bean,以相同的方式处理成功和失败。
更新
虽然我不清楚你想如何继续错误消息的流程,但你可以使用 ExpressionEvaluatingRequestHandlerAdvice
的 onFailureExpression
和 returnFailureExpressionResult=true
return unzipErrorChannel()
.
之后的内容
顺便说一句,如果没有 onFailureExpression
,failureChannel
逻辑就无法工作:
if (this.onFailureExpression != null) {
Object evalResult = this.evaluateFailureExpression(message, actualException);
if (this.returnFailureExpressionResult) {
return evalResult;
}
}
原来我在几个地方出了问题,比如:
我必须自动装配一个 Jackson2 ObjectMapper
(我从 Sprint Boot 自动配置中获得)并构造一个 JsonObjectMapper
的实例作为第二个参数添加到 Transformers.fromJson
;更宽松地解组为持久类型(停止 UnrecognizedPropertyException
);从而免除了 ExpressionEvaluatingRequestHandlerAdvice
的需要
在 IntegrationFlowDefinition
中选择 .split
方法的正确变体以使用 FileSplitter
,否则你不会得到这个分离器而是 DefaultMessageSplitter
在从 InputStream
读取第一条记录后提前终止流程
已将 transform
、aggregate
、handle
移动到使用异步任务执行程序的自己的 pubsub 频道
仍然不是我所需要的 100%,但还远远不够。
看看我最终得到了什么...
@Configuration
@EnableIntegration
@IntegrationComponentScan
public class DataMigrationModule {
private final Logger log = LoggerFactory.getLogger(getClass());
@Value("${cloud.aws.credentials.accessKey}")
private String accessKey;
@Value("${cloud.aws.credentials.secretKey}")
private String secretKey;
@Value("${cloud.aws.s3.bucket}")
private String bucket;
@Value("${cloud.aws.s3.max-objects-per-batch:1024}")
private int maxObjectsPerBatch;
@Value("${cloud.aws.s3.accept-subfolders:false}")
private String acceptSubFolders;
@Value("${cloud.aws.s3.remote-directory}")
private String remoteDirectory;
@Value("${cloud.aws.s3.local-directory-ref:java.io.tmpdir}")
private String localDirectoryRef;
@Value("${cloud.aws.s3.local-subdirectory:target/s3-dump}")
private String localSubdirectory;
@Value("${cloud.aws.s3.filename-wildcard:}")
private String fileNameWildcard;
@Value("${app.persistent-type:}")
private String persistentType;
@Value("${app.repository-type:}")
private String repositoryType;
@Value("${app.persistence-batch-size:2500}")
private int persistenceBatchSize;
@Value("${app.persistence-batch-release-timeout-in-milliseconds:5000}")
private int persistenceBatchReleaseTimeoutMillis;
@Autowired
private ListableBeanFactory beanFactory;
@Autowired
private ObjectMapper objectMapper;
private final AtomicBoolean invoked = new AtomicBoolean();
private Class<?> repositoryType() {
try {
return Class.forName(repositoryType);
} catch (ClassNotFoundException cnfe) {
log.error("Unknown repository implementation!", cnfe);
System.exit(0);
}
return null;
}
private Class<?> persistentType() {
try {
return Class.forName(persistentType);
} catch (ClassNotFoundException cnfe) {
log.error("Unsupported type!", cnfe);
System.exit(0);
}
return null;
}
public Date nextExecutionTime(TriggerContext triggerContext) {
return this.invoked.getAndSet(true) ? null : new Date();
}
@Bean
public FileToInputStreamTransformer unzipTransformer() {
FileToInputStreamTransformer transformer = new FileToInputStreamTransformer();
transformer.setDeleteFiles(true);
return transformer;
}
@Bean
public MessageSource<?> amazonS3InboundSynchronizationMessageSource() {
AWSCredentials credentials = new BasicAWSCredentials(this.accessKey, this.secretKey);
AmazonS3InboundSynchronizationMessageSource messageSource = new AmazonS3InboundSynchronizationMessageSource();
messageSource.setCredentials(credentials);
messageSource.setBucket(bucket);
messageSource.setMaxObjectsPerBatch(maxObjectsPerBatch);
messageSource.setAcceptSubFolders(Boolean.valueOf(acceptSubFolders));
messageSource.setRemoteDirectory(remoteDirectory);
if (!fileNameWildcard.isEmpty()) {
messageSource.setFileNameWildcard(fileNameWildcard);
}
String directory = System.getProperty(localDirectoryRef);
if (!localSubdirectory.startsWith("/")) {
localSubdirectory = "/" + localSubdirectory;
}
if (!localSubdirectory.endsWith("/")) {
localSubdirectory = localSubdirectory + "/";
}
directory = directory + localSubdirectory;
FileUtils.mkdir(directory);
messageSource.setDirectory(new LiteralExpression(directory));
return messageSource;
}
@Bean
public IntegrationFlow mainFlow() {
// @formatter:off
return IntegrationFlows
.from(
amazonS3InboundSynchronizationMessageSource(),
e -> e.poller(p -> p.trigger(this::nextExecutionTime))
)
.transform(unzipTransformer())
.split(new FileSplitter(), null)
.publishSubscribeChannel(new SimpleAsyncTaskExecutor(), p -> p.subscribe(persistenceSubFlow()))
.get();
// @formatter:on
}
@Bean
public IntegrationFlow persistenceSubFlow() {
JsonObjectMapper<?, ?> jsonObjectMapper = new Jackson2JsonObjectMapper(objectMapper);
ReleaseStrategy releaseStrategy = new TimeoutCountSequenceSizeReleaseStrategy(persistenceBatchSize,
persistenceBatchReleaseTimeoutMillis);
// @formatter:off
return f -> f
.transform(Transformers.fromJson(persistentType(), jsonObjectMapper))
// @see http://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#agg-and-group-to
.aggregate(
a -> a
.releaseStrategy(releaseStrategy)
.correlationStrategy(m -> m.getHeaders().get("id"))
.expireGroupsUponCompletion(true)
.sendPartialResultOnExpiry(true)
, null
)
.handle(jdbcRepositoryHandler());
// @formatter:on
}
@Bean
public JdbcRepositoryHandler jdbcRepositoryHandler() {
return new JdbcRepositoryHandler(repositoryType(), beanFactory);
}
protected class JdbcRepositoryHandler extends AbstractMessageHandler {
@SuppressWarnings("rawtypes")
private Insertable repository;
public JdbcRepositoryHandler(Class<?> repositoryClass, ListableBeanFactory beanFactory) {
repository = (Insertable<?>) beanFactory.getBean(repositoryClass);
}
@Override
protected void handleMessageInternal(Message<?> message) {
repository.insert((List<?>) message.getPayload());
}
}
protected class FileToInputStreamTransformer extends AbstractFilePayloadTransformer<InputStream> {
@Override
protected InputStream transformFile(File payload) throws Exception {
return new GZIPInputStream(new FileInputStream(payload));
}
}
}
我已经完成了一个"happy-path"(如下)。
我如何建议 .transform
调用以调用错误流(通过 errorChannel)w/o 中断 mainFlow
?
目前 mainFlow
在第二次 .transform
中出现第一次失败时终止(当有效负载无法反序列化为类型时)。我想要的行为是我想记录并继续处理。
我读过 ExpressionEvaluatingRequestHandlerAdvice
。我是否会像 e -> e.advice(myAdviceBean)
一样向每个 .transform
调用添加第二个参数并使用 success
和 error
通道声明这样的 bean?假设我需要分解我的 mainFlow
才能从每个转换中获得成功。
在一些评论的方向上,我更新了原始代码示例。但我仍然无法接受这个 "all the way home"。
2015-09-08 11:49:19,664 [pool-3-thread-1] org.springframework.integration.handler.ServiceActivatingHandler DEBUG handler 'ServiceActivator for [org.springframework.integration.dsl.support.BeanNameMessageProcessor@5f3839ad] (org.springframework.integration.handler.ServiceActivatingHandler#0)' produced no reply for request Message: ErrorMessage [payload=org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice$MessageHandlingExpressionEvaluatingAdviceException: Handler Failed; nested exception is org.springframework.integration.transformer.MessageTransformationException: failed to transform message; nested exception is com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "hasDoaCostPriceChanged" (class com.xxx.changehistory.jdbc.data.RatePlanLevelRestrictionLog), not marked as ignorable (18 known properties: "supplierUpdateDate", "fPLOSMaskArrival", "createDate", "endAllowed", "sellStateId", "ratePlanLevel", "ratePlanId", "startAllowed", "stayDate", "doaCostPriceChanged", "hotelId", "logActionTypeId" [truncated]]) at [Source: java.util.zip.GZIPInputStream@242017b8; line: 1, column: 32] (through reference chain: com.xxx.changehistory.jdbc.data.RatePlanLevelRestrictionLog["hasDoaCostPriceChanged"]), headers={id=c054d976-5750-827f-8894-51aba9655c77, timestamp=1441738159660}] 2015-09-08 11:49:19,664 [pool-3-thread-1] org.springframework.integration.channel.DirectChannel DEBUG postSend (sent=true) on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice$MessageHandlingExpressionEvaluatingAdviceException: Handler Failed; nested exception is org.springframework.integration.transformer.MessageTransformationException: failed to transform message; nested exception is com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "hasDoaCostPriceChanged" (class com.xxx.changehistory.jdbc.data.RatePlanLevelRestrictionLog), not marked as ignorable (18 known properties: "supplierUpdateDate", "fPLOSMaskArrival", "createDate", "endAllowed", "sellStateId", "ratePlanLevel", "ratePlanId", "startAllowed", "stayDate", "doaCostPriceChanged", "hotelId", "logActionTypeId" [truncated]]) at [Source: java.util.zip.GZIPInputStream@242017b8; line: 1, column: 32] (through reference chain: com.xxx.changehistory.jdbc.data.RatePlanLevelRestrictionLog["hasDoaCostPriceChanged"]), headers={id=c054d976-5750-827f-8894-51aba9655c77, timestamp=1441738159660}] 2015-09-08 11:49:19,664 [pool-3-thread-1] org.springframework.integration.channel.DirectChannel DEBUG preSend on channel 'mainFlow.channel#3', message: GenericMessage [payload=java.util.zip.GZIPInputStream@242017b8, headers={id=b80106f9-7f4c-1b92-6aca-6e73d3bf8792, timestamp=1441738159664}] 2015-09-08 11:49:19,664 [pool-3-thread-1] org.springframework.integration.aggregator.AggregatingMessageHandler DEBUG org.springframework.integration.aggregator.AggregatingMessageHandler#0 received message: GenericMessage [payload=java.util.zip.GZIPInputStream@242017b8, headers={id=b80106f9-7f4c-1b92-6aca-6e73d3bf8792, timestamp=1441738159664}] 2015-09-08 11:49:19,665 [pool-3-thread-1] org.springframework.integration.channel.DirectChannel DEBUG preSend on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.aggregator.AggregatingMessageHandler#0]; nested exception is java.lang.IllegalStateException: Null correlation not allowed. Maybe the CorrelationStrategy is failing?, headers={id=24e3a1c7-af6b-032c-6a29-b55031fba0d7, timestamp=1441738159665}] 2015-09-08 11:49:19,665 [pool-3-thread-1] org.springframework.integration.handler.ServiceActivatingHandler DEBUG ServiceActivator for [org.springframework.integration.dsl.support.BeanNameMessageProcessor@5f3839ad] (org.springframework.integration.handler.ServiceActivatingHandler#0) received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.aggregator.AggregatingMessageHandler#0]; nested exception is java.lang.IllegalStateException: Null correlation not allowed. Maybe the CorrelationStrategy is failing?, headers={id=24e3a1c7-af6b-032c-6a29-b55031fba0d7, timestamp=1441738159665}] 2015-09-08 11:49:19,665 [pool-3-thread-1] com.xxx.DataMigrationModule$ErrorService ERROR org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.aggregator.AggregatingMessageHandler#0]; nested exception is java.lang.IllegalStateException: Null correlation not allowed. Maybe the CorrelationStrategy is failing? at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84) at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101) at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97) at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:287) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:245) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:95) at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231) at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154) at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78) at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101) at org.springframework.integration.dispatcher.UnicastingDispatcher.access[=23=]0(UnicastingDispatcher.java:48) at org.springframework.integration.dispatcher.UnicastingDispatcher.run(UnicastingDispatcher.java:92) at org.springframework.integration.util.ErrorHandlingTaskExecutor.run(ErrorHandlingTaskExecutor.java:52) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.IllegalStateException: Null correlation not allowed. Maybe the CorrelationStrategy is failing? at org.springframework.util.Assert.state(Assert.java:385) at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:369) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78) ... 22 more
更新 (09-08-2015)
代码示例
@Bean
public IntegrationFlow mainFlow() {
// @formatter:off
return IntegrationFlows
.from(
amazonS3InboundSynchronizationMessageSource(),
e -> e.poller(p -> p.trigger(this::nextExecutionTime))
)
.transform(unzipTransformer())
.split(f -> new FileSplitter())
.channel(MessageChannels.executor(Executors.newCachedThreadPool()))
.transform(Transformers.fromJson(persistentType()), , e -> e.advice(handlingAdvice()))
// @see http://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#agg-and-group-to
.aggregate(a ->
a.releaseStrategy(g -> g.size() == persistenceBatchSize)
.expireGroupsUponCompletion(true)
.sendPartialResultOnExpiry(true)
.groupTimeoutExpression("size() ge 2 ? 10000 : -1")
, null
)
.handle(jdbcRepositoryHandler())
// TODO add advised PollableChannel to deal with possible persistence issue and retry with partial batch
.get();
// @formatter:on
}
@Bean
public ErrorService errorService() {
return new ErrorService();
}
@Bean
public MessageChannel customErrorChannel() {
return MessageChannels.direct().get();
}
@Bean
public IntegrationFlow customErrorFlow() {
// @formatter:off
return IntegrationFlows
.from(customErrorChannel())
.handle("errorService", "handleError")
.get();
// @formatter:on
}
@Bean
ExpressionEvaluatingRequestHandlerAdvice handlingAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setOnFailureExpression("payload");
advice.setFailureChannel(customErrorChannel());
advice.setReturnFailureExpressionResult(true);
advice.setTrapException(true);
return advice;
}
protected class ErrorService implements ErrorHandler {
private final Logger log = LoggerFactory.getLogger(getClass());
@Override
public void handleError(Throwable t) {
stopEndpoints(t);
}
private void stopEndpoints(Throwable t) {
log.error(ExceptionUtils.getStackTrace(t));
}
}
是的,你是对的。对于 advice
Transformer MessageHandler
的 handle()
方法,您应该使用 .transform()
EIP 方法的第二个参数的 e.advice
方法。是的:您应该根据您的目的定义 ExpressionEvaluatingRequestHandlerAdvice
bean。
您可以为不同的目标重用该 Advice
bean,以相同的方式处理成功和失败。
更新
虽然我不清楚你想如何继续错误消息的流程,但你可以使用 ExpressionEvaluatingRequestHandlerAdvice
的 onFailureExpression
和 returnFailureExpressionResult=true
return unzipErrorChannel()
.
顺便说一句,如果没有 onFailureExpression
,failureChannel
逻辑就无法工作:
if (this.onFailureExpression != null) {
Object evalResult = this.evaluateFailureExpression(message, actualException);
if (this.returnFailureExpressionResult) {
return evalResult;
}
}
原来我在几个地方出了问题,比如:
我必须自动装配一个 Jackson2
ObjectMapper
(我从 Sprint Boot 自动配置中获得)并构造一个JsonObjectMapper
的实例作为第二个参数添加到Transformers.fromJson
;更宽松地解组为持久类型(停止UnrecognizedPropertyException
);从而免除了ExpressionEvaluatingRequestHandlerAdvice
的需要
在
IntegrationFlowDefinition
中选择.split
方法的正确变体以使用FileSplitter
,否则你不会得到这个分离器而是DefaultMessageSplitter
在从InputStream
读取第一条记录后提前终止流程
已将
transform
、aggregate
、handle
移动到使用异步任务执行程序的自己的 pubsub 频道
仍然不是我所需要的 100%,但还远远不够。
看看我最终得到了什么...
@Configuration
@EnableIntegration
@IntegrationComponentScan
public class DataMigrationModule {
private final Logger log = LoggerFactory.getLogger(getClass());
@Value("${cloud.aws.credentials.accessKey}")
private String accessKey;
@Value("${cloud.aws.credentials.secretKey}")
private String secretKey;
@Value("${cloud.aws.s3.bucket}")
private String bucket;
@Value("${cloud.aws.s3.max-objects-per-batch:1024}")
private int maxObjectsPerBatch;
@Value("${cloud.aws.s3.accept-subfolders:false}")
private String acceptSubFolders;
@Value("${cloud.aws.s3.remote-directory}")
private String remoteDirectory;
@Value("${cloud.aws.s3.local-directory-ref:java.io.tmpdir}")
private String localDirectoryRef;
@Value("${cloud.aws.s3.local-subdirectory:target/s3-dump}")
private String localSubdirectory;
@Value("${cloud.aws.s3.filename-wildcard:}")
private String fileNameWildcard;
@Value("${app.persistent-type:}")
private String persistentType;
@Value("${app.repository-type:}")
private String repositoryType;
@Value("${app.persistence-batch-size:2500}")
private int persistenceBatchSize;
@Value("${app.persistence-batch-release-timeout-in-milliseconds:5000}")
private int persistenceBatchReleaseTimeoutMillis;
@Autowired
private ListableBeanFactory beanFactory;
@Autowired
private ObjectMapper objectMapper;
private final AtomicBoolean invoked = new AtomicBoolean();
private Class<?> repositoryType() {
try {
return Class.forName(repositoryType);
} catch (ClassNotFoundException cnfe) {
log.error("Unknown repository implementation!", cnfe);
System.exit(0);
}
return null;
}
private Class<?> persistentType() {
try {
return Class.forName(persistentType);
} catch (ClassNotFoundException cnfe) {
log.error("Unsupported type!", cnfe);
System.exit(0);
}
return null;
}
public Date nextExecutionTime(TriggerContext triggerContext) {
return this.invoked.getAndSet(true) ? null : new Date();
}
@Bean
public FileToInputStreamTransformer unzipTransformer() {
FileToInputStreamTransformer transformer = new FileToInputStreamTransformer();
transformer.setDeleteFiles(true);
return transformer;
}
@Bean
public MessageSource<?> amazonS3InboundSynchronizationMessageSource() {
AWSCredentials credentials = new BasicAWSCredentials(this.accessKey, this.secretKey);
AmazonS3InboundSynchronizationMessageSource messageSource = new AmazonS3InboundSynchronizationMessageSource();
messageSource.setCredentials(credentials);
messageSource.setBucket(bucket);
messageSource.setMaxObjectsPerBatch(maxObjectsPerBatch);
messageSource.setAcceptSubFolders(Boolean.valueOf(acceptSubFolders));
messageSource.setRemoteDirectory(remoteDirectory);
if (!fileNameWildcard.isEmpty()) {
messageSource.setFileNameWildcard(fileNameWildcard);
}
String directory = System.getProperty(localDirectoryRef);
if (!localSubdirectory.startsWith("/")) {
localSubdirectory = "/" + localSubdirectory;
}
if (!localSubdirectory.endsWith("/")) {
localSubdirectory = localSubdirectory + "/";
}
directory = directory + localSubdirectory;
FileUtils.mkdir(directory);
messageSource.setDirectory(new LiteralExpression(directory));
return messageSource;
}
@Bean
public IntegrationFlow mainFlow() {
// @formatter:off
return IntegrationFlows
.from(
amazonS3InboundSynchronizationMessageSource(),
e -> e.poller(p -> p.trigger(this::nextExecutionTime))
)
.transform(unzipTransformer())
.split(new FileSplitter(), null)
.publishSubscribeChannel(new SimpleAsyncTaskExecutor(), p -> p.subscribe(persistenceSubFlow()))
.get();
// @formatter:on
}
@Bean
public IntegrationFlow persistenceSubFlow() {
JsonObjectMapper<?, ?> jsonObjectMapper = new Jackson2JsonObjectMapper(objectMapper);
ReleaseStrategy releaseStrategy = new TimeoutCountSequenceSizeReleaseStrategy(persistenceBatchSize,
persistenceBatchReleaseTimeoutMillis);
// @formatter:off
return f -> f
.transform(Transformers.fromJson(persistentType(), jsonObjectMapper))
// @see http://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#agg-and-group-to
.aggregate(
a -> a
.releaseStrategy(releaseStrategy)
.correlationStrategy(m -> m.getHeaders().get("id"))
.expireGroupsUponCompletion(true)
.sendPartialResultOnExpiry(true)
, null
)
.handle(jdbcRepositoryHandler());
// @formatter:on
}
@Bean
public JdbcRepositoryHandler jdbcRepositoryHandler() {
return new JdbcRepositoryHandler(repositoryType(), beanFactory);
}
protected class JdbcRepositoryHandler extends AbstractMessageHandler {
@SuppressWarnings("rawtypes")
private Insertable repository;
public JdbcRepositoryHandler(Class<?> repositoryClass, ListableBeanFactory beanFactory) {
repository = (Insertable<?>) beanFactory.getBean(repositoryClass);
}
@Override
protected void handleMessageInternal(Message<?> message) {
repository.insert((List<?>) message.getPayload());
}
}
protected class FileToInputStreamTransformer extends AbstractFilePayloadTransformer<InputStream> {
@Override
protected InputStream transformFile(File payload) throws Exception {
return new GZIPInputStream(new FileInputStream(payload));
}
}
}