@MessagingGateway,Spring Cloud Stream,以及两者的错误处理
@MessagingGateway, Spring Cloud Stream, and error handling across both
关于为 发布的答案,处理可以从 Spring Cloud Stream 服务返回的@MessagingGateway 错误的正确方法是什么?
回顾一下,我有一个 @MessagingGateway,它提供对使用 Spring Cloud Stream 构建的异步服务的同步访问。当我的 Spring Cloud Stream 服务层发生错误时,我会创建错误响应并通过 SubscribableChannel 将其发送到处理错误的其他 @StreamListener 服务。
例如,在创建帐户时,我向 accountCreated
频道发送消息。发生错误时,我会向 accountNotCreated
频道发送错误响应。
这很好用,但我还想向@MessagingGateway 的客户端发送错误响应,以便他们同步接收错误响应。 @MessagingGateway 注释有一个 errorChannel
属性,但 @Gateway 注释没有。因此,@MessagingGateway 的客户端应该能够阻止并等待 1) 创建帐户或 2) 错误响应。
同样,这里的目标是构建 "backend" 服务,利用 Spring Cloud Stream 进行交易服务(即创建、更新或删除数据的服务),同时提供我们的客户 "gateway" 访问该块并等待返回响应。 Artem Bilan 为我提供的解决方案适用于快乐路径,但当发生错误时,我不清楚 Spring 集成最适合如何处理此问题。
更新代码示例
GatewayApplication.java
package com.example.demo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.dsl.HeaderEnricherSpec;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
@EnableBinding({GatewayApplication.GatewayChannels.class})
@SpringBootApplication
public class GatewayApplication {
@Component
public interface GatewayChannels {
String TO_UPPERCASE_REPLY = "to-uppercase-reply";
String TO_UPPERCASE_REQUEST = "to-uppercase-request";
@Input(TO_UPPERCASE_REPLY)
SubscribableChannel toUppercaseReply();
@Output(TO_UPPERCASE_REQUEST)
SubscribableChannel toUppercaseRequest();
}
@MessagingGateway
public interface StreamGateway {
public static final String ENRICH = "enrich";
@Gateway(requestChannel = ENRICH, replyChannel = GatewayChannels.TO_UPPERCASE_REPLY)
StringWrapper process(StringWrapper payload) throws MyException;
}
@RestController
public class UppercaseController {
@Autowired
StreamGateway gateway;
@GetMapping(value = "/string/{string}",
produces = {MediaType.APPLICATION_JSON_VALUE, MediaType.APPLICATION_XML_VALUE})
public ResponseEntity<StringWrapper> getUser(@PathVariable("string") String string) {
try {
StringWrapper result = gateway.process(new StringWrapper(string));
// Instead of catching the exception in the below catch clause, here we have just a string
// representation of the stack trace when an exception occurs.
return new ResponseEntity<StringWrapper>(result, HttpStatus.OK);
} catch (MyException e) {
// Why is the exception not caught here?
return new ResponseEntity<StringWrapper>(new StringWrapper("An error has occurred"),
HttpStatus.INTERNAL_SERVER_ERROR);
}
}
}
public static void main(String[] args) {
SpringApplication.run(GatewayApplication.class, args);
}
@Bean
public IntegrationFlow headerEnricherFlow() {
return IntegrationFlows.from(StreamGateway.ENRICH)
.enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
.channel(GatewayChannels.TO_UPPERCASE_REQUEST).get();
}
}
application.yml
spring:
cloud:
stream:
bindings:
to-uppercase-request:
destination: to-uppercase-request
content-type: application/json
group: stream-to-uppercase-request
to-uppercase-reply:
destination: to-uppercase-reply
content-type: application/json
producer:
required-groups: gateway-to-uppercase-reply, stream-listener-to-uppercase-reply
server:
port: 8088
StringWrapper.java(在所有三个项目中使用)
package com.example.demo;
import com.fasterxml.jackson.annotation.JsonProperty;
public class StringWrapper {
@JsonProperty
private String string;
@JsonProperty
private long time = System.currentTimeMillis();
public StringWrapper() {
super();
}
public StringWrapper(String string) {
this.string = string;
}
public String getString() {
return string;
}
public long getTime() {
return time;
}
public void setString(String string) {
this.string = string;
}
@Override
public String toString() {
return "StringWrapper [string=" + string + ", time=" + time + "]";
}
}
CloudStreamApplication.java
package com.example.demo;
import java.util.Random;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@EnableBinding({CloudStreamApplication.CloudStreamChannels.class})
@SpringBootApplication
public class CloudStreamApplication {
@Component
interface CloudStreamChannels {
String TO_UPPERCASE_REPLY = "to-uppercase-reply";
String TO_UPPERCASE_REQUEST = "to-uppercase-request";
@Output(TO_UPPERCASE_REPLY)
SubscribableChannel toUppercaseReply();
@Input(TO_UPPERCASE_REQUEST)
SubscribableChannel toUppercaseRequest();
}
@Component
public class Processor {
@Autowired
CloudStreamChannels channels;
@StreamListener(CloudStreamChannels.TO_UPPERCASE_REQUEST)
public void process(Message<StringWrapper> request) {
StringWrapper uppercase = null;
try {
uppercase = toUppercase(request);
} catch (MyException e) {
channels.toUppercaseReply()
.send(MessageBuilder.withPayload(e).setHeader("__TypeId__", e.getClass().getName())
.copyHeaders(request.getHeaders()).build());
}
if (uppercase != null) {
channels.toUppercaseReply()
.send(MessageBuilder.withPayload(uppercase)
.setHeader("__TypeId__", StringWrapper.class.getName())
.copyHeaders(request.getHeaders()).build());
}
}
private StringWrapper toUppercase(Message<StringWrapper> request) throws MyException {
Random random = new Random();
int number = random.nextInt(50) + 1;
if (number > 25) {
throw new MyException("An error occurred.");
}
return new StringWrapper(request.getPayload().getString().toUpperCase());
}
}
public static void main(String[] args) {
SpringApplication.run(CloudStreamApplication.class, args);
}
}
application.yml
spring:
cloud:
stream:
bindings:
to-uppercase-request:
destination: to-uppercase-request
content-type: application/json
group: stream-to-uppercase-request
to-uppercase-reply:
destination: to-uppercase-reply
content-type: application/json
producer:
required-groups: gateway-to-uppercase-reply, stream-listener-to-uppercase-reply
server:
port: 8088
StreamListenerApplication.java
package com.example.demo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;
@EnableBinding({StreamListenerApplication.CloudStreamChannels.class})
@SpringBootApplication
public class StreamListenerApplication {
@Component
interface CloudStreamChannels {
String TO_UPPERCASE_REPLY = "to-uppercase-reply";
@Input(TO_UPPERCASE_REPLY)
SubscribableChannel toUppercaseReply();
}
public static void main(String[] args) {
SpringApplication.run(StreamListenerApplication.class, args);
}
@Autowired
CloudStreamChannels channels;
@StreamListener(CloudStreamChannels.TO_UPPERCASE_REPLY)
public void processToUppercaseReply(Message<StringWrapper> message) {
System.out.println("Processing message: " + message.getPayload());
}
}
application.yml
spring:
cloud:
stream:
bindings:
to-uppercase-reply:
destination: to-uppercase-reply
content-type: application/json
group: stream-listener-to-uppercase-reply
server:
port: 8089
@MessagingGateway
上只有一个全局 errorChannel
用于所有 @Gateway
方法。如果您的网关有多个 @Gateway
方法,每个方法都可以设置一条消息 header 以指示哪个方法失败。
如果您发送 Message<Throwable>
到网关的回复通道(并且没有错误通道),有效负载将被抛出给调用者。
如果网关方法有一个 throws
子句,则会尝试解包原因树以查找该异常。
如果您添加一个 errorChannel
,而不是向调用者抛出异常,一个带有异常的 ErrorMessage
,因为它的负载被发送到错误通道 - 然后您可以做任何进一步的 post-processing 在错误通道流上,并根据需要向调用者抛出一些其他异常。不过,听起来你不需要那个。
所以,把它们放在一起...
- 让错误处理服务将一些消息发送到另一个目的地。
- 在网关服务中,为该目的地添加一个
@StreamListener
。
- 在
@StreamListener
中构建一个带有 Exception
负载的消息并将其发送到网关的回复通道。
- 网关随后会将负载抛给调用者。
像这样的东西应该有用...
@Gateway(requestChannel = ENRICH, replyChannel = GatewayChannels.TO_UPPERCASE_REPLY)
String process(String payload) throws MyException;
.
@StreamListener(CloudStreamChannels.TO_UPPERCASE_FAILURES)
public void failed(Message<FailInfo> failed) { // payload has info about the failure
Message m = MessageBuilder.withPayload(new MyException(failed.getPayload())).
.copyHeaders(failed.getHeaders())
.build();
this.reply.send(m); // send directly to the gateway's reply channel (perhaps @Autowired)
}
无论涉及多少远程服务,端到端地传播回复通道 header 很重要。
编辑
@SpringBootApplication
@EnableBinding(TwoAsyncPipes.class)
public class So47948454aApplication {
public static void main(String[] args) {
SpringApplication.run(So47948454aApplication.class, args).close();
}
@Bean
public ApplicationRunner runner(Gate gate) {
return args -> {
System.out.println(gate.foo(new Foo("foo")));
try {
gate.foo(new Foo("fail"));
}
catch (MyException e) {
System.out.println(e);
}
};
}
@MessagingGateway
public interface Gate {
@Gateway(requestChannel = "enrich", replyChannel = "transformed")
Foo foo(Foo foo) throws MyException;
}
@Bean
public IntegrationFlow headerEnricherFlow() {
return IntegrationFlows.from("enrich")
.enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
.channel("gateOut").get();
}
@Bean
public MessageChannel transformed() {
return new DirectChannel();
}
@Transformer(inputChannel = "gateIn", outputChannel = "transformed")
public Object jsonToObject(Message<?> in) {
return jtot().transform(in);
}
@Bean
public JsonToObjectTransformer jtot() {
return new JsonToObjectTransformer();
}
@StreamListener("serviceIn")
@SendTo("serviceOut")
public Message<?> listen(Foo in) {
if (in.foo.equals("fail")) {
return MessageBuilder.withPayload(new MyException("failed"))
.setHeader(JsonHeaders.TYPE_ID,
MyException.class.getName())
.build();
}
else {
return MessageBuilder.withPayload(new Foo("bar"))
.setHeader(JsonHeaders.TYPE_ID,
Foo.class.getName())
.build();
}
}
public static class Foo {
String foo;
public Foo() {
super();
}
public Foo(String foo) {
this.foo = foo;
}
public String getFoo() {
return this.foo;
}
public void setFoo(String foo) {
this.foo = foo;
}
@Override
public String toString() {
return "Foo [foo=" + this.foo + "]";
}
}
@SuppressWarnings("serial")
public static class MyException extends RuntimeException {
private String error;
public MyException() {
super();
}
public MyException(String error) {
this.error = error;
}
public String getError() {
return this.error;
}
public void setError(String error) {
this.error = error;
}
@Override
public String toString() {
return "MyException [error=" + this.error + "]";
}
}
public interface TwoAsyncPipes {
@Output("gateOut")
MessageChannel gateOut();
@Input("serviceIn")
MessageChannel serviceIn();
@Output("serviceOut")
MessageChannel serviceOut();
@Input("gateIn")
MessageChannel gateIn();
}
}
和
Foo [foo=bar]
MyException [error=failed]
POM
http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
<groupId>com.example</groupId>
<artifactId>so47948454a</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>so47948454a</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.9.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<spring-cloud.version>Edgware.RELEASE</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-java-dsl</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
兔子活页夹 1.3.0.RELEASE
Spring 集成 4.3.12
2017-12-26 13:56:18.121 INFO 39008 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: SpringAMQP#7e87ef9e:0/SimpleConnection@45843650 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 60995]
Foo [foo=bar]
MyException [error=failed]
2017-12-26 13:56:18.165 INFO 39008 --- [ main] com.example.So47948454aApplication : Started So47948454aApplication in 3.422 seconds (JVM running for 3.858)
application.yml:
spring:
cloud:
stream:
bindings:
gateIn:
destination: serviceOut
content-type: application/json
gateOut:
destination: serviceIn
content-type: application/json
serviceIn:
destination: serviceIn
content-type: application/json
serviceOut:
destination: serviceOut
content-type: application/json
关于为
回顾一下,我有一个 @MessagingGateway,它提供对使用 Spring Cloud Stream 构建的异步服务的同步访问。当我的 Spring Cloud Stream 服务层发生错误时,我会创建错误响应并通过 SubscribableChannel 将其发送到处理错误的其他 @StreamListener 服务。
例如,在创建帐户时,我向 accountCreated
频道发送消息。发生错误时,我会向 accountNotCreated
频道发送错误响应。
这很好用,但我还想向@MessagingGateway 的客户端发送错误响应,以便他们同步接收错误响应。 @MessagingGateway 注释有一个 errorChannel
属性,但 @Gateway 注释没有。因此,@MessagingGateway 的客户端应该能够阻止并等待 1) 创建帐户或 2) 错误响应。
同样,这里的目标是构建 "backend" 服务,利用 Spring Cloud Stream 进行交易服务(即创建、更新或删除数据的服务),同时提供我们的客户 "gateway" 访问该块并等待返回响应。 Artem Bilan 为我提供的解决方案适用于快乐路径,但当发生错误时,我不清楚 Spring 集成最适合如何处理此问题。
更新代码示例
GatewayApplication.java
package com.example.demo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.dsl.HeaderEnricherSpec;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
@EnableBinding({GatewayApplication.GatewayChannels.class})
@SpringBootApplication
public class GatewayApplication {
@Component
public interface GatewayChannels {
String TO_UPPERCASE_REPLY = "to-uppercase-reply";
String TO_UPPERCASE_REQUEST = "to-uppercase-request";
@Input(TO_UPPERCASE_REPLY)
SubscribableChannel toUppercaseReply();
@Output(TO_UPPERCASE_REQUEST)
SubscribableChannel toUppercaseRequest();
}
@MessagingGateway
public interface StreamGateway {
public static final String ENRICH = "enrich";
@Gateway(requestChannel = ENRICH, replyChannel = GatewayChannels.TO_UPPERCASE_REPLY)
StringWrapper process(StringWrapper payload) throws MyException;
}
@RestController
public class UppercaseController {
@Autowired
StreamGateway gateway;
@GetMapping(value = "/string/{string}",
produces = {MediaType.APPLICATION_JSON_VALUE, MediaType.APPLICATION_XML_VALUE})
public ResponseEntity<StringWrapper> getUser(@PathVariable("string") String string) {
try {
StringWrapper result = gateway.process(new StringWrapper(string));
// Instead of catching the exception in the below catch clause, here we have just a string
// representation of the stack trace when an exception occurs.
return new ResponseEntity<StringWrapper>(result, HttpStatus.OK);
} catch (MyException e) {
// Why is the exception not caught here?
return new ResponseEntity<StringWrapper>(new StringWrapper("An error has occurred"),
HttpStatus.INTERNAL_SERVER_ERROR);
}
}
}
public static void main(String[] args) {
SpringApplication.run(GatewayApplication.class, args);
}
@Bean
public IntegrationFlow headerEnricherFlow() {
return IntegrationFlows.from(StreamGateway.ENRICH)
.enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
.channel(GatewayChannels.TO_UPPERCASE_REQUEST).get();
}
}
application.yml
spring:
cloud:
stream:
bindings:
to-uppercase-request:
destination: to-uppercase-request
content-type: application/json
group: stream-to-uppercase-request
to-uppercase-reply:
destination: to-uppercase-reply
content-type: application/json
producer:
required-groups: gateway-to-uppercase-reply, stream-listener-to-uppercase-reply
server:
port: 8088
StringWrapper.java(在所有三个项目中使用)
package com.example.demo;
import com.fasterxml.jackson.annotation.JsonProperty;
public class StringWrapper {
@JsonProperty
private String string;
@JsonProperty
private long time = System.currentTimeMillis();
public StringWrapper() {
super();
}
public StringWrapper(String string) {
this.string = string;
}
public String getString() {
return string;
}
public long getTime() {
return time;
}
public void setString(String string) {
this.string = string;
}
@Override
public String toString() {
return "StringWrapper [string=" + string + ", time=" + time + "]";
}
}
CloudStreamApplication.java
package com.example.demo;
import java.util.Random;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@EnableBinding({CloudStreamApplication.CloudStreamChannels.class})
@SpringBootApplication
public class CloudStreamApplication {
@Component
interface CloudStreamChannels {
String TO_UPPERCASE_REPLY = "to-uppercase-reply";
String TO_UPPERCASE_REQUEST = "to-uppercase-request";
@Output(TO_UPPERCASE_REPLY)
SubscribableChannel toUppercaseReply();
@Input(TO_UPPERCASE_REQUEST)
SubscribableChannel toUppercaseRequest();
}
@Component
public class Processor {
@Autowired
CloudStreamChannels channels;
@StreamListener(CloudStreamChannels.TO_UPPERCASE_REQUEST)
public void process(Message<StringWrapper> request) {
StringWrapper uppercase = null;
try {
uppercase = toUppercase(request);
} catch (MyException e) {
channels.toUppercaseReply()
.send(MessageBuilder.withPayload(e).setHeader("__TypeId__", e.getClass().getName())
.copyHeaders(request.getHeaders()).build());
}
if (uppercase != null) {
channels.toUppercaseReply()
.send(MessageBuilder.withPayload(uppercase)
.setHeader("__TypeId__", StringWrapper.class.getName())
.copyHeaders(request.getHeaders()).build());
}
}
private StringWrapper toUppercase(Message<StringWrapper> request) throws MyException {
Random random = new Random();
int number = random.nextInt(50) + 1;
if (number > 25) {
throw new MyException("An error occurred.");
}
return new StringWrapper(request.getPayload().getString().toUpperCase());
}
}
public static void main(String[] args) {
SpringApplication.run(CloudStreamApplication.class, args);
}
}
application.yml
spring:
cloud:
stream:
bindings:
to-uppercase-request:
destination: to-uppercase-request
content-type: application/json
group: stream-to-uppercase-request
to-uppercase-reply:
destination: to-uppercase-reply
content-type: application/json
producer:
required-groups: gateway-to-uppercase-reply, stream-listener-to-uppercase-reply
server:
port: 8088
StreamListenerApplication.java
package com.example.demo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;
@EnableBinding({StreamListenerApplication.CloudStreamChannels.class})
@SpringBootApplication
public class StreamListenerApplication {
@Component
interface CloudStreamChannels {
String TO_UPPERCASE_REPLY = "to-uppercase-reply";
@Input(TO_UPPERCASE_REPLY)
SubscribableChannel toUppercaseReply();
}
public static void main(String[] args) {
SpringApplication.run(StreamListenerApplication.class, args);
}
@Autowired
CloudStreamChannels channels;
@StreamListener(CloudStreamChannels.TO_UPPERCASE_REPLY)
public void processToUppercaseReply(Message<StringWrapper> message) {
System.out.println("Processing message: " + message.getPayload());
}
}
application.yml
spring:
cloud:
stream:
bindings:
to-uppercase-reply:
destination: to-uppercase-reply
content-type: application/json
group: stream-listener-to-uppercase-reply
server:
port: 8089
@MessagingGateway
上只有一个全局 errorChannel
用于所有 @Gateway
方法。如果您的网关有多个 @Gateway
方法,每个方法都可以设置一条消息 header 以指示哪个方法失败。
如果您发送 Message<Throwable>
到网关的回复通道(并且没有错误通道),有效负载将被抛出给调用者。
如果网关方法有一个 throws
子句,则会尝试解包原因树以查找该异常。
如果您添加一个 errorChannel
,而不是向调用者抛出异常,一个带有异常的 ErrorMessage
,因为它的负载被发送到错误通道 - 然后您可以做任何进一步的 post-processing 在错误通道流上,并根据需要向调用者抛出一些其他异常。不过,听起来你不需要那个。
所以,把它们放在一起...
- 让错误处理服务将一些消息发送到另一个目的地。
- 在网关服务中,为该目的地添加一个
@StreamListener
。 - 在
@StreamListener
中构建一个带有Exception
负载的消息并将其发送到网关的回复通道。 - 网关随后会将负载抛给调用者。
像这样的东西应该有用...
@Gateway(requestChannel = ENRICH, replyChannel = GatewayChannels.TO_UPPERCASE_REPLY)
String process(String payload) throws MyException;
.
@StreamListener(CloudStreamChannels.TO_UPPERCASE_FAILURES)
public void failed(Message<FailInfo> failed) { // payload has info about the failure
Message m = MessageBuilder.withPayload(new MyException(failed.getPayload())).
.copyHeaders(failed.getHeaders())
.build();
this.reply.send(m); // send directly to the gateway's reply channel (perhaps @Autowired)
}
无论涉及多少远程服务,端到端地传播回复通道 header 很重要。
编辑
@SpringBootApplication
@EnableBinding(TwoAsyncPipes.class)
public class So47948454aApplication {
public static void main(String[] args) {
SpringApplication.run(So47948454aApplication.class, args).close();
}
@Bean
public ApplicationRunner runner(Gate gate) {
return args -> {
System.out.println(gate.foo(new Foo("foo")));
try {
gate.foo(new Foo("fail"));
}
catch (MyException e) {
System.out.println(e);
}
};
}
@MessagingGateway
public interface Gate {
@Gateway(requestChannel = "enrich", replyChannel = "transformed")
Foo foo(Foo foo) throws MyException;
}
@Bean
public IntegrationFlow headerEnricherFlow() {
return IntegrationFlows.from("enrich")
.enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
.channel("gateOut").get();
}
@Bean
public MessageChannel transformed() {
return new DirectChannel();
}
@Transformer(inputChannel = "gateIn", outputChannel = "transformed")
public Object jsonToObject(Message<?> in) {
return jtot().transform(in);
}
@Bean
public JsonToObjectTransformer jtot() {
return new JsonToObjectTransformer();
}
@StreamListener("serviceIn")
@SendTo("serviceOut")
public Message<?> listen(Foo in) {
if (in.foo.equals("fail")) {
return MessageBuilder.withPayload(new MyException("failed"))
.setHeader(JsonHeaders.TYPE_ID,
MyException.class.getName())
.build();
}
else {
return MessageBuilder.withPayload(new Foo("bar"))
.setHeader(JsonHeaders.TYPE_ID,
Foo.class.getName())
.build();
}
}
public static class Foo {
String foo;
public Foo() {
super();
}
public Foo(String foo) {
this.foo = foo;
}
public String getFoo() {
return this.foo;
}
public void setFoo(String foo) {
this.foo = foo;
}
@Override
public String toString() {
return "Foo [foo=" + this.foo + "]";
}
}
@SuppressWarnings("serial")
public static class MyException extends RuntimeException {
private String error;
public MyException() {
super();
}
public MyException(String error) {
this.error = error;
}
public String getError() {
return this.error;
}
public void setError(String error) {
this.error = error;
}
@Override
public String toString() {
return "MyException [error=" + this.error + "]";
}
}
public interface TwoAsyncPipes {
@Output("gateOut")
MessageChannel gateOut();
@Input("serviceIn")
MessageChannel serviceIn();
@Output("serviceOut")
MessageChannel serviceOut();
@Input("gateIn")
MessageChannel gateIn();
}
}
和
Foo [foo=bar]
MyException [error=failed]
POM
http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0
<groupId>com.example</groupId>
<artifactId>so47948454a</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>so47948454a</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.9.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<spring-cloud.version>Edgware.RELEASE</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-java-dsl</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
兔子活页夹 1.3.0.RELEASE Spring 集成 4.3.12
2017-12-26 13:56:18.121 INFO 39008 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: SpringAMQP#7e87ef9e:0/SimpleConnection@45843650 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 60995]
Foo [foo=bar]
MyException [error=failed]
2017-12-26 13:56:18.165 INFO 39008 --- [ main] com.example.So47948454aApplication : Started So47948454aApplication in 3.422 seconds (JVM running for 3.858)
application.yml:
spring:
cloud:
stream:
bindings:
gateIn:
destination: serviceOut
content-type: application/json
gateOut:
destination: serviceIn
content-type: application/json
serviceIn:
destination: serviceIn
content-type: application/json
serviceOut:
destination: serviceOut
content-type: application/json