Spring 集成 DSL 自定义错误通道不起作用
Spring Integration DSL Custom Error Channel Not Work
我使用 Spring 集成的 DSL 实现。
我有下面的代码,我不能使用我的自定义错误流程。当 authenticate 方法抛出 Runtime Exception 时,errorChannel 开始处理。我丰富了 header 以使用我的自定义错误流,但没有使用。
// In Class - 1
@Bean
public MarshallingWebServiceInboundGateway marshallingWebServiceInboundGateway(BeanFactoryChannelResolver channelResolver, Jaxb2Marshaller marshaller) {
MarshallingWebServiceInboundGateway wsInboundGateway = new MarshallingWebServiceInboundGateway();
wsInboundGateway.setRequestChannel(channelResolver.resolveDestination("incomingRequest.input"));
wsInboundGateway.setReplyChannel(channelResolver.resolveDestination("outgoingResponse.input"));
wsInboundGateway.setErrorChannel(channelResolver.resolveDestination("errorChannel"));
wsInboundGateway.setMarshaller(marshaller);
wsInboundGateway.setUnmarshaller(marshaller);
return wsInboundGateway;
}
// In Class - 2
@Bean
public IntegrationFlow incomingRequest() {
return f -> f.<Object, Class<?>>route(t -> t.getClass(),
mapping -> mapping.subFlowMapping(payloadType1(),
sf -> sf.gateway("type1.input", ConsumerEndpointSpec::transactional))
.subFlowMapping(payloadType2(),
sf -> sf.gateway("type2.input", ConsumerEndpointSpec::transactional)),
conf -> conf.id("router:Incoming request router"));
}
// In Class - 3
@Bean
public IntegrationFlow type1() {
IntegrationFlow integrationFlow = f -> f
.enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, "error222", true))
.<Type1>handle((p, h) -> authentication.authenticate(p),
conf -> conf.id("service-activator:Authenticate"))
.transform(transformer::transformType1MsgToDataX,
conf -> conf.id("transform:Unmarshall type1 Message"))
.enrichHeaders(h -> h.headerExpression(TypeDataIntegrationMessageHeaderAccessor.MESSAGE_ID, "payload.id")
.headerExpression(TypeDataIntegrationMessageHeaderAccessor.MESSAGE_TYPE, "payload.messageType"))
.handle((GenericHandler<DataX>) repository::successResponseMessage,
conf -> conf.id("service-activator:return success"))
.channel("outgoingResponse.input")
;
return integrationFlow;
}
// In Class - 3
@Bean
public IntegrationFlow error222Flow() {
return IntegrationFlows.from("error222").handle("repository", "failureResponseMessage").get()
;
}
编辑:
在 Artem 的回答之后,我的代码如下所示。但是,我仍然无法访问错误流中的 header 参数。我收到错误 - "No channel resolved by router 'router:error response prepare' "
// In Class - 1
@Bean
public MarshallingWebServiceInboundGateway marshallingWebServiceInboundGateway(BeanFactoryChannelResolver channelResolver, Jaxb2Marshaller marshaller) {
MarshallingWebServiceInboundGateway wsInboundGateway = new MarshallingWebServiceInboundGateway();
wsInboundGateway.setRequestChannel(channelResolver.resolveDestination("incomingRequest.input"));
wsInboundGateway.setReplyChannel(channelResolver.resolveDestination("outgoingResponse.input"));
wsInboundGateway.setErrorChannel(channelResolver.resolveDestination("errorResponse.input"));
wsInboundGateway.setMarshaller(marshaller);
wsInboundGateway.setUnmarshaller(marshaller);
return wsInboundGateway;
}
// In Class - 2
@Bean
public IntegrationFlow incomingRequest() {
return f -> f.<Object, Class<?>>route(t -> t.getClass(),
mapping -> mapping.subFlowMapping(payloadType1(),
sf -> sf.gateway("type1.input", ConsumerEndpointSpec::transactional))
.subFlowMapping(payloadType2(),
sf -> sf.gateway("type2.input", ConsumerEndpointSpec::transactional)),
conf -> conf.id("router:Incoming request router"));
}
// In Class - 2
@Bean
public IntegrationFlow errorResponse(){
return f -> f.<MessageHandlingException, Object>route(t -> t.getFailedMessage().getHeaders().get("ABCDEF"),
mapping -> mapping.subFlowMapping("ABCDEF",
sf -> sf.gateway("customError.input", ConsumerEndpointSpec::transactional)),
conf -> conf.id("router:error response prepare"));
}
// In Class - 3
@Bean
public IntegrationFlow type1() {
IntegrationFlow integrationFlow = f -> f
.enrichHeaders(h -> h.header("ABCDEF", "ABCDEF", true))
.<Type1>handle((p, h) -> authentication.authenticate(p),
conf -> conf.id("service-activator:Authenticate"))
.transform(transformer::transformType1MsgToDataX,
conf -> conf.id("transform:Unmarshall type1 Message"))
.enrichHeaders(h -> h.headerExpression(TypeDataIntegrationMessageHeaderAccessor.MESSAGE_ID, "payload.id")
.headerExpression(TypeDataIntegrationMessageHeaderAccessor.MESSAGE_TYPE, "payload.messageType"))
.handle((GenericHandler<DataX>) repository::successResponseMessage,
conf -> conf.id("service-activator:return success"))
.channel("outgoingResponse.input")
;
return integrationFlow;
}
// In Class - 3
@Bean
public IntegrationFlow customError(){
return f -> f.handle((GenericHandler<MessageHandlingException>)eventRepository::failureResponseMessage,
conf -> conf.id("service-activator:return failure"));
}
编辑 - 2:
我尝试了Artem 的测试代码,它在这种情况下有效。如果我将 type1 流转换为子流映射如下(我这样做,因为我怀疑我的子流代码块),错误流无法打印 ABCDEF 参数值。
之后,我在子流映射中添加了另一个header(XYZTWR),但它也无法打印。
@Bean
public IntegrationFlow type1() {
return f -> f.<String, String>route(t -> t.toString(), mapping -> mapping.subFlowMapping("foo",
sf -> sf.gateway("fooFlow.input", ConsumerEndpointSpec::transactional).enrichHeaders(h -> h.header("XYZTRW", "XYZTRW", true))));
}
@Bean
public IntegrationFlow fooFlow() {
return f -> f.enrichHeaders(h -> h.header("ABCDEF", "ABCDEF", true))
.handle((p, h) -> {
throw new RuntimeException("intentional");
});
}
我的 S.OUT 是:
GenericMessage [payload=foo, headers={history=testGateway,type1.input, id=1fad7a65-4abe-c41d-0b22-36839a103269, timestamp=1503029553071}]
当我们将消息转移到不同的线程执行器或queue 通道时,errorChannel
header 开始工作。否则标准 throw
和 try...catch
在同一个调用堆栈中工作。
所以在你的情况下,身份验证异常只是抛给调用者 - WS Inbound Gateway。在这里你已经配置了全局错误通道。
我做了这个测试:
@Configuration
@EnableIntegration
@IntegrationComponentScan
public static class ContextConfiguration {
@Bean
public IntegrationFlow errorResponse() {
return IntegrationFlows.from(errorChannel())
.<MessagingException, Message<?>>transform(MessagingException::getFailedMessage,
e -> e.poller(p -> p.fixedDelay(100)))
.get();
}
@Bean
public IntegrationFlow type1() {
return f -> f
.enrichHeaders(h -> h.header("ABCDEF", "ABCDEF", true))
.handle((p, h) -> { throw new RuntimeException("intentional"); });
}
@Bean
public PollableChannel errorChannel() {
return new QueueChannel();
}
}
@MessagingGateway(errorChannel = "errorChannel", defaultRequestChannel = "type1.input")
public interface TestGateway {
Message<?> sendTest(String payload);
}
...
@Autowired
private TestGateway testGateway;
@Test
public void testErrorChannel() {
Message<?> message = this.testGateway.sendTest("foo");
System.out.println(message);
}
我的 SOUT 显示:
GenericMessage [payload=foo, headers={ABCDEF=ABCDEF, id=ae5d2d44-46b7-912d-17d4-bf2ee656140a, timestamp=1502999446725}]
请为 org.springframework.integration
类别设置 DEBUG 日志记录级别,并观察您的消息在哪一步丢失了所需的 headers。
更新
好的。我明白你的问题了。由于您使用 sf -> sf.gateway("fooFlow.input", ConsumerEndpointSpec::transactional)
,换句话说,您通过网关调用下游,您在那里所做的一切都在门后,如果发生错误,您可能会感到内疚,只有您发送到那里的内容 - 网关的请求消息.默认吞下下游failedMessage
要解决此问题,您应该考虑为 .gateway()
添加一个 errorChannel()
选项并在那里处理下游错误。或者...只是不要在路由器的子流中使用.gateway()
,而是简单的channel
映射。
.transactional()
也可以在任何 .handle()
上配置。
我使用 Spring 集成的 DSL 实现。 我有下面的代码,我不能使用我的自定义错误流程。当 authenticate 方法抛出 Runtime Exception 时,errorChannel 开始处理。我丰富了 header 以使用我的自定义错误流,但没有使用。
// In Class - 1
@Bean
public MarshallingWebServiceInboundGateway marshallingWebServiceInboundGateway(BeanFactoryChannelResolver channelResolver, Jaxb2Marshaller marshaller) {
MarshallingWebServiceInboundGateway wsInboundGateway = new MarshallingWebServiceInboundGateway();
wsInboundGateway.setRequestChannel(channelResolver.resolveDestination("incomingRequest.input"));
wsInboundGateway.setReplyChannel(channelResolver.resolveDestination("outgoingResponse.input"));
wsInboundGateway.setErrorChannel(channelResolver.resolveDestination("errorChannel"));
wsInboundGateway.setMarshaller(marshaller);
wsInboundGateway.setUnmarshaller(marshaller);
return wsInboundGateway;
}
// In Class - 2
@Bean
public IntegrationFlow incomingRequest() {
return f -> f.<Object, Class<?>>route(t -> t.getClass(),
mapping -> mapping.subFlowMapping(payloadType1(),
sf -> sf.gateway("type1.input", ConsumerEndpointSpec::transactional))
.subFlowMapping(payloadType2(),
sf -> sf.gateway("type2.input", ConsumerEndpointSpec::transactional)),
conf -> conf.id("router:Incoming request router"));
}
// In Class - 3
@Bean
public IntegrationFlow type1() {
IntegrationFlow integrationFlow = f -> f
.enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, "error222", true))
.<Type1>handle((p, h) -> authentication.authenticate(p),
conf -> conf.id("service-activator:Authenticate"))
.transform(transformer::transformType1MsgToDataX,
conf -> conf.id("transform:Unmarshall type1 Message"))
.enrichHeaders(h -> h.headerExpression(TypeDataIntegrationMessageHeaderAccessor.MESSAGE_ID, "payload.id")
.headerExpression(TypeDataIntegrationMessageHeaderAccessor.MESSAGE_TYPE, "payload.messageType"))
.handle((GenericHandler<DataX>) repository::successResponseMessage,
conf -> conf.id("service-activator:return success"))
.channel("outgoingResponse.input")
;
return integrationFlow;
}
// In Class - 3
@Bean
public IntegrationFlow error222Flow() {
return IntegrationFlows.from("error222").handle("repository", "failureResponseMessage").get()
;
}
编辑:
在 Artem 的回答之后,我的代码如下所示。但是,我仍然无法访问错误流中的 header 参数。我收到错误 - "No channel resolved by router 'router:error response prepare' "
// In Class - 1
@Bean
public MarshallingWebServiceInboundGateway marshallingWebServiceInboundGateway(BeanFactoryChannelResolver channelResolver, Jaxb2Marshaller marshaller) {
MarshallingWebServiceInboundGateway wsInboundGateway = new MarshallingWebServiceInboundGateway();
wsInboundGateway.setRequestChannel(channelResolver.resolveDestination("incomingRequest.input"));
wsInboundGateway.setReplyChannel(channelResolver.resolveDestination("outgoingResponse.input"));
wsInboundGateway.setErrorChannel(channelResolver.resolveDestination("errorResponse.input"));
wsInboundGateway.setMarshaller(marshaller);
wsInboundGateway.setUnmarshaller(marshaller);
return wsInboundGateway;
}
// In Class - 2
@Bean
public IntegrationFlow incomingRequest() {
return f -> f.<Object, Class<?>>route(t -> t.getClass(),
mapping -> mapping.subFlowMapping(payloadType1(),
sf -> sf.gateway("type1.input", ConsumerEndpointSpec::transactional))
.subFlowMapping(payloadType2(),
sf -> sf.gateway("type2.input", ConsumerEndpointSpec::transactional)),
conf -> conf.id("router:Incoming request router"));
}
// In Class - 2
@Bean
public IntegrationFlow errorResponse(){
return f -> f.<MessageHandlingException, Object>route(t -> t.getFailedMessage().getHeaders().get("ABCDEF"),
mapping -> mapping.subFlowMapping("ABCDEF",
sf -> sf.gateway("customError.input", ConsumerEndpointSpec::transactional)),
conf -> conf.id("router:error response prepare"));
}
// In Class - 3
@Bean
public IntegrationFlow type1() {
IntegrationFlow integrationFlow = f -> f
.enrichHeaders(h -> h.header("ABCDEF", "ABCDEF", true))
.<Type1>handle((p, h) -> authentication.authenticate(p),
conf -> conf.id("service-activator:Authenticate"))
.transform(transformer::transformType1MsgToDataX,
conf -> conf.id("transform:Unmarshall type1 Message"))
.enrichHeaders(h -> h.headerExpression(TypeDataIntegrationMessageHeaderAccessor.MESSAGE_ID, "payload.id")
.headerExpression(TypeDataIntegrationMessageHeaderAccessor.MESSAGE_TYPE, "payload.messageType"))
.handle((GenericHandler<DataX>) repository::successResponseMessage,
conf -> conf.id("service-activator:return success"))
.channel("outgoingResponse.input")
;
return integrationFlow;
}
// In Class - 3
@Bean
public IntegrationFlow customError(){
return f -> f.handle((GenericHandler<MessageHandlingException>)eventRepository::failureResponseMessage,
conf -> conf.id("service-activator:return failure"));
}
编辑 - 2:
我尝试了Artem 的测试代码,它在这种情况下有效。如果我将 type1 流转换为子流映射如下(我这样做,因为我怀疑我的子流代码块),错误流无法打印 ABCDEF 参数值。 之后,我在子流映射中添加了另一个header(XYZTWR),但它也无法打印。
@Bean
public IntegrationFlow type1() {
return f -> f.<String, String>route(t -> t.toString(), mapping -> mapping.subFlowMapping("foo",
sf -> sf.gateway("fooFlow.input", ConsumerEndpointSpec::transactional).enrichHeaders(h -> h.header("XYZTRW", "XYZTRW", true))));
}
@Bean
public IntegrationFlow fooFlow() {
return f -> f.enrichHeaders(h -> h.header("ABCDEF", "ABCDEF", true))
.handle((p, h) -> {
throw new RuntimeException("intentional");
});
}
我的 S.OUT 是:
GenericMessage [payload=foo, headers={history=testGateway,type1.input, id=1fad7a65-4abe-c41d-0b22-36839a103269, timestamp=1503029553071}]
当我们将消息转移到不同的线程执行器或queue 通道时,errorChannel
header 开始工作。否则标准 throw
和 try...catch
在同一个调用堆栈中工作。
所以在你的情况下,身份验证异常只是抛给调用者 - WS Inbound Gateway。在这里你已经配置了全局错误通道。
我做了这个测试:
@Configuration
@EnableIntegration
@IntegrationComponentScan
public static class ContextConfiguration {
@Bean
public IntegrationFlow errorResponse() {
return IntegrationFlows.from(errorChannel())
.<MessagingException, Message<?>>transform(MessagingException::getFailedMessage,
e -> e.poller(p -> p.fixedDelay(100)))
.get();
}
@Bean
public IntegrationFlow type1() {
return f -> f
.enrichHeaders(h -> h.header("ABCDEF", "ABCDEF", true))
.handle((p, h) -> { throw new RuntimeException("intentional"); });
}
@Bean
public PollableChannel errorChannel() {
return new QueueChannel();
}
}
@MessagingGateway(errorChannel = "errorChannel", defaultRequestChannel = "type1.input")
public interface TestGateway {
Message<?> sendTest(String payload);
}
...
@Autowired
private TestGateway testGateway;
@Test
public void testErrorChannel() {
Message<?> message = this.testGateway.sendTest("foo");
System.out.println(message);
}
我的 SOUT 显示:
GenericMessage [payload=foo, headers={ABCDEF=ABCDEF, id=ae5d2d44-46b7-912d-17d4-bf2ee656140a, timestamp=1502999446725}]
请为 org.springframework.integration
类别设置 DEBUG 日志记录级别,并观察您的消息在哪一步丢失了所需的 headers。
更新
好的。我明白你的问题了。由于您使用 sf -> sf.gateway("fooFlow.input", ConsumerEndpointSpec::transactional)
,换句话说,您通过网关调用下游,您在那里所做的一切都在门后,如果发生错误,您可能会感到内疚,只有您发送到那里的内容 - 网关的请求消息.默认吞下下游failedMessage
要解决此问题,您应该考虑为 .gateway()
添加一个 errorChannel()
选项并在那里处理下游错误。或者...只是不要在路由器的子流中使用.gateway()
,而是简单的channel
映射。
.transactional()
也可以在任何 .handle()
上配置。