如何使用 Spring Cloud Stream MessageChannels 配置 @MessagingGateway?
How can @MessagingGateway be configured with Spring Cloud Stream MessageChannels?
我已经开发了异步 Spring Cloud Stream 服务,我正在尝试开发一种边缘服务,该服务使用 @MessagingGateway 提供对本质上异步的服务的同步访问。
我目前得到以下堆栈跟踪:
Caused by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:355)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:271)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:188)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
... 47 common frames omitted
我的@MessagingGateway:
@EnableBinding(AccountChannels.class)
@MessagingGateway
public interface AccountService {
@Gateway(requestChannel = AccountChannels.CREATE_ACCOUNT_REQUEST,replyChannel = AccountChannels.ACCOUNT_CREATED, replyTimeout = 60000, requestTimeout = 60000)
Account createAccount(@Payload Account account, @Header("Authorization") String authorization);
}
如果我通过@StreamListener 在回复通道上使用消息,它工作得很好:
@HystrixCommand(commandKey = "acounts-edge:accountCreated", fallbackMethod = "accountCreatedFallback", commandProperties = {@HystrixProperty(name = "execution.isolation.strategy", value = "SEMAPHORE")}, ignoreExceptions = {ClientException.class})
@StreamListener(AccountChannels.ACCOUNT_CREATED)
public void accountCreated(Account account, @Header(name = "spanTraceId", required = false) String traceId) {
try {
if (log.isInfoEnabled()) {
log.info(new StringBuilder("Account created: ").append(objectMapper.writeValueAsString(account)).toString());
}
} catch (JsonProcessingException e) {
log.error(e.getMessage(), e);
}
}
在生产者这边,我配置requiredGroups
保证多个消费者可以处理消息,相应的,消费者有匹配的group
配置。
消费者:
spring:
cloud:
stream:
bindings:
create-account-request:
binder: rabbit1
contentType: application/json
destination: create-account-request
requiredGroups: accounts-service-create-account-request
account-created:
binder: rabbit1
contentType: application/json
destination: account-created
group: accounts-edge-account-created
制作人:
spring:
cloud:
stream:
bindings:
create-account-request:
binder: rabbit1
contentType: application/json
destination: create-account-request
group: accounts-service-create-account-request
account-created:
binder: rabbit1
contentType: application/json
destination: account-created
requiredGroups: accounts-edge-account-created
生产者端处理请求并发送响应的代码位:
accountChannels.accountCreated().send(MessageBuilder.withPayload(accountService.createAccount(account)).build());
我可以调试并看到请求被接收和处理,但是当响应被发送到回复通道时,就是错误发生的时候。
要使@MessagingGateway 正常工作,我缺少哪些配置 and/or 代码?我知道我正在结合使用 Spring 集成和 Spring 云网关,所以我不确定将它们一起使用是否会导致问题。
嗯,我对你想要完成的事情有点困惑,但让我们看看我们是否能解决这个问题。
混合 SI 和 SCSt 绝对是自然的,因为一个是建立在另一个之上的,所以都应该工作:
这是我刚刚从一个旧示例中挖出的示例代码片段,它公开了 REST 端点,但委托(通过网关)到 Source 的输出通道。看看是否有帮助:
@EnableBinding(Source.class)
@SpringBootApplication
@RestController
public class FooApplication {
. . .
@Autowired
private Source channels;
@Autowired
private CompletionService completionService;
@RequestMapping("/complete")
public String completeRequest(@RequestParam int id) {
this.completionService.complete("foo");
return "OK";
}
@MessagingGateway
interface CompletionService {
@Gateway(requestChannel = Source.OUTPUT)
void complete(String message);
}
}
这是个好问题,也是个好主意。但它不会那么容易工作。
首先我们必须自己确定gateway
意味着request/reply
,因此correlation
。这在 @MessagingGateway
中可以通过 replyChannel
header 面对 TemporaryReplyChannel
实例。即使您有明确的 replyChannel = AccountChannels.ACCOUNT_CREATED
,关联也只能通过提到的 header 及其值来完成。事实上,这个 TemporaryReplyChannel
不可序列化,无法通过网络传输给另一端的消费者。
幸运的是Spring集成为我们提供了一些解决方案。它是 HeaderEnricher
的一部分,它的 headerChannelsToString
选项位于 HeaderChannelRegistry
后面:
Starting with Spring Integration 3.0, a new sub-element <int:header-channels-to-string/>
is available; it has no attributes. This converts existing replyChannel and errorChannel headers (when they are a MessageChannel) to a String and stores the channel(s) in a registry for later resolution when it is time to send a reply, or handle an error. This is useful for cases where the headers might be lost; for example when serializing a message into a message store or when transporting the message over JMS. If the header does not already exist, or it is not a MessageChannel, no changes are made.
但在这种情况下,您必须引入一个从网关到 HeaderEnricher
的内部通道,并且只有最后一个通道会将消息发送到 AccountChannels.CREATE_ACCOUNT_REQUEST
。因此,replyChannel
header 将被转换为字符串表示形式并能够在网络上传输。在消费者方面,当你发送回复时,你应该确保你也传输了 replyChannel
header,因为它是。因此,当消息到达生产者端的 AccountChannels.ACCOUNT_CREATED
时,我们有 @MessagingGateway
,关联机制能够将通道标识符转换为正确的 TemporaryReplyChannel
并将回复等待中的网关呼叫。
这里唯一的问题是您的生产者应用程序必须是 AccountChannels.ACCOUNT_CREATED
组中的单个消费者 - 我们必须确保云中一次只有一个实例在运行。仅仅因为只有一个实例的内存中有 TemporaryReplyChannel
。
更新
一些帮助代码:
@EnableBinding(AccountChannels.class)
@MessagingGateway
public interface AccountService {
@Gateway(requestChannel = AccountChannels.INTERNAL_CREATE_ACCOUNT_REQUEST, replyChannel = AccountChannels.ACCOUNT_CREATED, replyTimeout = 60000, requestTimeout = 60000)
Account createAccount(@Payload Account account, @Header("Authorization") String authorization);
}
@Bean
public IntegrationFlow headerEnricherFlow() {
return IntegrationFlows.from(AccountChannels.INTERNAL_CREATE_ACCOUNT_REQUEST)
.enrichHeaders(headerEnricher -> headerEnricher.headerChannelsToString())
.channel(AccountChannels.CREATE_ACCOUNT_REQUEST)
.get();
}
更新
演示 PoC 的一些简单应用程序:
@EnableBinding({ Processor.class, CloudStreamGatewayApplication.GatewayChannels.class })
@SpringBootApplication
public class CloudStreamGatewayApplication {
interface GatewayChannels {
String REQUEST = "request";
@Output(REQUEST)
MessageChannel request();
String REPLY = "reply";
@Input(REPLY)
SubscribableChannel reply();
}
private static final String ENRICH = "enrich";
@MessagingGateway
public interface StreamGateway {
@Gateway(requestChannel = ENRICH, replyChannel = GatewayChannels.REPLY)
String process(String payload);
}
@Bean
public IntegrationFlow headerEnricherFlow() {
return IntegrationFlows.from(ENRICH)
.enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
.channel(GatewayChannels.REQUEST)
.get();
}
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Message<?> process(Message<String> request) {
return MessageBuilder.withPayload(request.getPayload().toUpperCase())
.copyHeaders(request.getHeaders())
.build();
}
public static void main(String[] args) {
ConfigurableApplicationContext applicationContext =
SpringApplication.run(CloudStreamGatewayApplication.class, args);
StreamGateway gateway = applicationContext.getBean(StreamGateway.class);
String result = gateway.process("foo");
System.out.println(result);
}
}
application.yml
:
spring:
cloud:
stream:
bindings:
input:
destination: requests
output:
destination: replies
request:
destination: requests
reply:
destination: replies
我用spring-cloud-starter-stream-rabbit
.
MessageBuilder.withPayload(request.getPayload().toUpperCase())
.copyHeaders(request.getHeaders())
.build()
将技巧复制请求 header 发送到回复消息。因此,网关能够在回复端将 header 中的通道标识符转换为适当的 TemporaryReplyChannel
以将回复正确地传送给网关的调用者。
关于此事的 SCSt 问题:https://github.com/spring-cloud/spring-cloud-stream/issues/815
在 Artem 的帮助下,我找到了我一直在寻找的解决方案。我已经将 Artem 发布的代码分成两个服务,一个网关服务和一个 CloudStream 服务。我还添加了一个 @RestController
用于测试目的。这基本上模仿了我想用持久队列做的事情。感谢 Artem 的帮助!我真的很感谢你的时间!我希望这可以帮助其他想要做同样事情的人。
网关代码
package com.example.demo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.dsl.HeaderEnricherSpec;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
@EnableBinding({GatewayApplication.GatewayChannels.class})
@SpringBootApplication
public class GatewayApplication {
interface GatewayChannels {
String TO_UPPERCASE_REPLY = "to-uppercase-reply";
String TO_UPPERCASE_REQUEST = "to-uppercase-request";
@Input(TO_UPPERCASE_REPLY)
SubscribableChannel toUppercaseReply();
@Output(TO_UPPERCASE_REQUEST)
MessageChannel toUppercaseRequest();
}
@MessagingGateway
public interface StreamGateway {
@Gateway(requestChannel = ENRICH, replyChannel = GatewayChannels.TO_UPPERCASE_REPLY)
String process(String payload);
}
private static final String ENRICH = "enrich";
public static void main(String[] args) {
SpringApplication.run(GatewayApplication.class, args);
}
@Bean
public IntegrationFlow headerEnricherFlow() {
return IntegrationFlows.from(ENRICH).enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
.channel(GatewayChannels.TO_UPPERCASE_REQUEST).get();
}
@RestController
public class UppercaseController {
@Autowired
StreamGateway gateway;
@GetMapping(value = "/string/{string}",
produces = {MediaType.APPLICATION_JSON_VALUE, MediaType.APPLICATION_XML_VALUE})
public ResponseEntity<String> getUser(@PathVariable("string") String string) {
return new ResponseEntity<String>(gateway.process(string), HttpStatus.OK);
}
}
}
网关配置(application.yml)
spring:
cloud:
stream:
bindings:
to-uppercase-request:
destination: to-uppercase-request
producer:
required-groups: stream-to-uppercase-request
to-uppercase-reply:
destination: to-uppercase-reply
group: gateway-to-uppercase-reply
server:
port: 8080
CloudStream 代码
package com.example.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.support.MessageBuilder;
@EnableBinding({CloudStreamApplication.CloudStreamChannels.class})
@SpringBootApplication
public class CloudStreamApplication {
interface CloudStreamChannels {
String TO_UPPERCASE_REPLY = "to-uppercase-reply";
String TO_UPPERCASE_REQUEST = "to-uppercase-request";
@Output(TO_UPPERCASE_REPLY)
SubscribableChannel toUppercaseReply();
@Input(TO_UPPERCASE_REQUEST)
MessageChannel toUppercaseRequest();
}
public static void main(String[] args) {
SpringApplication.run(CloudStreamApplication.class, args);
}
@StreamListener(CloudStreamChannels.TO_UPPERCASE_REQUEST)
@SendTo(CloudStreamChannels.TO_UPPERCASE_REPLY)
public Message<?> process(Message<String> request) {
return MessageBuilder.withPayload(request.getPayload().toUpperCase())
.copyHeaders(request.getHeaders()).build();
}
}
CloudStream 配置 (application.yml)
spring:
cloud:
stream:
bindings:
to-uppercase-request:
destination: to-uppercase-request
group: stream-to-uppercase-request
to-uppercase-reply:
destination: to-uppercase-reply
producer:
required-groups: gateway-to-uppercase-reply
server:
port: 8081
我已经开发了异步 Spring Cloud Stream 服务,我正在尝试开发一种边缘服务,该服务使用 @MessagingGateway 提供对本质上异步的服务的同步访问。
我目前得到以下堆栈跟踪:
Caused by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:355)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:271)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:188)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
... 47 common frames omitted
我的@MessagingGateway:
@EnableBinding(AccountChannels.class)
@MessagingGateway
public interface AccountService {
@Gateway(requestChannel = AccountChannels.CREATE_ACCOUNT_REQUEST,replyChannel = AccountChannels.ACCOUNT_CREATED, replyTimeout = 60000, requestTimeout = 60000)
Account createAccount(@Payload Account account, @Header("Authorization") String authorization);
}
如果我通过@StreamListener 在回复通道上使用消息,它工作得很好:
@HystrixCommand(commandKey = "acounts-edge:accountCreated", fallbackMethod = "accountCreatedFallback", commandProperties = {@HystrixProperty(name = "execution.isolation.strategy", value = "SEMAPHORE")}, ignoreExceptions = {ClientException.class})
@StreamListener(AccountChannels.ACCOUNT_CREATED)
public void accountCreated(Account account, @Header(name = "spanTraceId", required = false) String traceId) {
try {
if (log.isInfoEnabled()) {
log.info(new StringBuilder("Account created: ").append(objectMapper.writeValueAsString(account)).toString());
}
} catch (JsonProcessingException e) {
log.error(e.getMessage(), e);
}
}
在生产者这边,我配置requiredGroups
保证多个消费者可以处理消息,相应的,消费者有匹配的group
配置。
消费者:
spring:
cloud:
stream:
bindings:
create-account-request:
binder: rabbit1
contentType: application/json
destination: create-account-request
requiredGroups: accounts-service-create-account-request
account-created:
binder: rabbit1
contentType: application/json
destination: account-created
group: accounts-edge-account-created
制作人:
spring:
cloud:
stream:
bindings:
create-account-request:
binder: rabbit1
contentType: application/json
destination: create-account-request
group: accounts-service-create-account-request
account-created:
binder: rabbit1
contentType: application/json
destination: account-created
requiredGroups: accounts-edge-account-created
生产者端处理请求并发送响应的代码位:
accountChannels.accountCreated().send(MessageBuilder.withPayload(accountService.createAccount(account)).build());
我可以调试并看到请求被接收和处理,但是当响应被发送到回复通道时,就是错误发生的时候。
要使@MessagingGateway 正常工作,我缺少哪些配置 and/or 代码?我知道我正在结合使用 Spring 集成和 Spring 云网关,所以我不确定将它们一起使用是否会导致问题。
嗯,我对你想要完成的事情有点困惑,但让我们看看我们是否能解决这个问题。 混合 SI 和 SCSt 绝对是自然的,因为一个是建立在另一个之上的,所以都应该工作: 这是我刚刚从一个旧示例中挖出的示例代码片段,它公开了 REST 端点,但委托(通过网关)到 Source 的输出通道。看看是否有帮助:
@EnableBinding(Source.class)
@SpringBootApplication
@RestController
public class FooApplication {
. . .
@Autowired
private Source channels;
@Autowired
private CompletionService completionService;
@RequestMapping("/complete")
public String completeRequest(@RequestParam int id) {
this.completionService.complete("foo");
return "OK";
}
@MessagingGateway
interface CompletionService {
@Gateway(requestChannel = Source.OUTPUT)
void complete(String message);
}
}
这是个好问题,也是个好主意。但它不会那么容易工作。
首先我们必须自己确定gateway
意味着request/reply
,因此correlation
。这在 @MessagingGateway
中可以通过 replyChannel
header 面对 TemporaryReplyChannel
实例。即使您有明确的 replyChannel = AccountChannels.ACCOUNT_CREATED
,关联也只能通过提到的 header 及其值来完成。事实上,这个 TemporaryReplyChannel
不可序列化,无法通过网络传输给另一端的消费者。
幸运的是Spring集成为我们提供了一些解决方案。它是 HeaderEnricher
的一部分,它的 headerChannelsToString
选项位于 HeaderChannelRegistry
后面:
Starting with Spring Integration 3.0, a new sub-element
<int:header-channels-to-string/>
is available; it has no attributes. This converts existing replyChannel and errorChannel headers (when they are a MessageChannel) to a String and stores the channel(s) in a registry for later resolution when it is time to send a reply, or handle an error. This is useful for cases where the headers might be lost; for example when serializing a message into a message store or when transporting the message over JMS. If the header does not already exist, or it is not a MessageChannel, no changes are made.
但在这种情况下,您必须引入一个从网关到 HeaderEnricher
的内部通道,并且只有最后一个通道会将消息发送到 AccountChannels.CREATE_ACCOUNT_REQUEST
。因此,replyChannel
header 将被转换为字符串表示形式并能够在网络上传输。在消费者方面,当你发送回复时,你应该确保你也传输了 replyChannel
header,因为它是。因此,当消息到达生产者端的 AccountChannels.ACCOUNT_CREATED
时,我们有 @MessagingGateway
,关联机制能够将通道标识符转换为正确的 TemporaryReplyChannel
并将回复等待中的网关呼叫。
这里唯一的问题是您的生产者应用程序必须是 AccountChannels.ACCOUNT_CREATED
组中的单个消费者 - 我们必须确保云中一次只有一个实例在运行。仅仅因为只有一个实例的内存中有 TemporaryReplyChannel
。
更新
一些帮助代码:
@EnableBinding(AccountChannels.class)
@MessagingGateway
public interface AccountService {
@Gateway(requestChannel = AccountChannels.INTERNAL_CREATE_ACCOUNT_REQUEST, replyChannel = AccountChannels.ACCOUNT_CREATED, replyTimeout = 60000, requestTimeout = 60000)
Account createAccount(@Payload Account account, @Header("Authorization") String authorization);
}
@Bean
public IntegrationFlow headerEnricherFlow() {
return IntegrationFlows.from(AccountChannels.INTERNAL_CREATE_ACCOUNT_REQUEST)
.enrichHeaders(headerEnricher -> headerEnricher.headerChannelsToString())
.channel(AccountChannels.CREATE_ACCOUNT_REQUEST)
.get();
}
更新
演示 PoC 的一些简单应用程序:
@EnableBinding({ Processor.class, CloudStreamGatewayApplication.GatewayChannels.class })
@SpringBootApplication
public class CloudStreamGatewayApplication {
interface GatewayChannels {
String REQUEST = "request";
@Output(REQUEST)
MessageChannel request();
String REPLY = "reply";
@Input(REPLY)
SubscribableChannel reply();
}
private static final String ENRICH = "enrich";
@MessagingGateway
public interface StreamGateway {
@Gateway(requestChannel = ENRICH, replyChannel = GatewayChannels.REPLY)
String process(String payload);
}
@Bean
public IntegrationFlow headerEnricherFlow() {
return IntegrationFlows.from(ENRICH)
.enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
.channel(GatewayChannels.REQUEST)
.get();
}
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Message<?> process(Message<String> request) {
return MessageBuilder.withPayload(request.getPayload().toUpperCase())
.copyHeaders(request.getHeaders())
.build();
}
public static void main(String[] args) {
ConfigurableApplicationContext applicationContext =
SpringApplication.run(CloudStreamGatewayApplication.class, args);
StreamGateway gateway = applicationContext.getBean(StreamGateway.class);
String result = gateway.process("foo");
System.out.println(result);
}
}
application.yml
:
spring:
cloud:
stream:
bindings:
input:
destination: requests
output:
destination: replies
request:
destination: requests
reply:
destination: replies
我用spring-cloud-starter-stream-rabbit
.
MessageBuilder.withPayload(request.getPayload().toUpperCase())
.copyHeaders(request.getHeaders())
.build()
将技巧复制请求 header 发送到回复消息。因此,网关能够在回复端将 header 中的通道标识符转换为适当的 TemporaryReplyChannel
以将回复正确地传送给网关的调用者。
关于此事的 SCSt 问题:https://github.com/spring-cloud/spring-cloud-stream/issues/815
在 Artem 的帮助下,我找到了我一直在寻找的解决方案。我已经将 Artem 发布的代码分成两个服务,一个网关服务和一个 CloudStream 服务。我还添加了一个 @RestController
用于测试目的。这基本上模仿了我想用持久队列做的事情。感谢 Artem 的帮助!我真的很感谢你的时间!我希望这可以帮助其他想要做同样事情的人。
网关代码
package com.example.demo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.dsl.HeaderEnricherSpec;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
@EnableBinding({GatewayApplication.GatewayChannels.class})
@SpringBootApplication
public class GatewayApplication {
interface GatewayChannels {
String TO_UPPERCASE_REPLY = "to-uppercase-reply";
String TO_UPPERCASE_REQUEST = "to-uppercase-request";
@Input(TO_UPPERCASE_REPLY)
SubscribableChannel toUppercaseReply();
@Output(TO_UPPERCASE_REQUEST)
MessageChannel toUppercaseRequest();
}
@MessagingGateway
public interface StreamGateway {
@Gateway(requestChannel = ENRICH, replyChannel = GatewayChannels.TO_UPPERCASE_REPLY)
String process(String payload);
}
private static final String ENRICH = "enrich";
public static void main(String[] args) {
SpringApplication.run(GatewayApplication.class, args);
}
@Bean
public IntegrationFlow headerEnricherFlow() {
return IntegrationFlows.from(ENRICH).enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
.channel(GatewayChannels.TO_UPPERCASE_REQUEST).get();
}
@RestController
public class UppercaseController {
@Autowired
StreamGateway gateway;
@GetMapping(value = "/string/{string}",
produces = {MediaType.APPLICATION_JSON_VALUE, MediaType.APPLICATION_XML_VALUE})
public ResponseEntity<String> getUser(@PathVariable("string") String string) {
return new ResponseEntity<String>(gateway.process(string), HttpStatus.OK);
}
}
}
网关配置(application.yml)
spring:
cloud:
stream:
bindings:
to-uppercase-request:
destination: to-uppercase-request
producer:
required-groups: stream-to-uppercase-request
to-uppercase-reply:
destination: to-uppercase-reply
group: gateway-to-uppercase-reply
server:
port: 8080
CloudStream 代码
package com.example.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.support.MessageBuilder;
@EnableBinding({CloudStreamApplication.CloudStreamChannels.class})
@SpringBootApplication
public class CloudStreamApplication {
interface CloudStreamChannels {
String TO_UPPERCASE_REPLY = "to-uppercase-reply";
String TO_UPPERCASE_REQUEST = "to-uppercase-request";
@Output(TO_UPPERCASE_REPLY)
SubscribableChannel toUppercaseReply();
@Input(TO_UPPERCASE_REQUEST)
MessageChannel toUppercaseRequest();
}
public static void main(String[] args) {
SpringApplication.run(CloudStreamApplication.class, args);
}
@StreamListener(CloudStreamChannels.TO_UPPERCASE_REQUEST)
@SendTo(CloudStreamChannels.TO_UPPERCASE_REPLY)
public Message<?> process(Message<String> request) {
return MessageBuilder.withPayload(request.getPayload().toUpperCase())
.copyHeaders(request.getHeaders()).build();
}
}
CloudStream 配置 (application.yml)
spring:
cloud:
stream:
bindings:
to-uppercase-request:
destination: to-uppercase-request
group: stream-to-uppercase-request
to-uppercase-reply:
destination: to-uppercase-reply
producer:
required-groups: gateway-to-uppercase-reply
server:
port: 8081