Spring 集成 DSL 中使用通道和网关的 Echo 套接字服务
Echo socket service in Spring Integration DSL using Channels and Gateways
这是我的问题的变体 。介绍了一个很好的工作解决方案,但我想探索替代方案。我特别感兴趣的是基于在客户端和服务器实现中显式使用入站和出站通道的解决方案。这可能吗?
到目前为止我能够想出:
HeartbeatClientConfig
...
@Bean
public IntegrationFlow heartbeatClientFlow(
TcpNetClientConnectionFactory clientConnectionFactory,
MessageChannel outboundChannel,
PollableChannel inboundChannel) {
return IntegrationFlows
.from(outboundChannel)
.handle(Tcp.outboundGateway(clientConnectionFactory))
.channel(inboundChannel)
.get();
}
...
HeartbeatClient
public HeartbeatClient(MessageChannel outboudChannel, PollableChannel inboundChannel) {
this.inboundChannel = inboundChannel;
this.outboudChannel = outboudChannel;
}
...
void run() {
// ..in scheduled intervals in loop
outboudChannel.send(new GenericMessage<String>("status"));
Message<?> message = inboundChannel.receive(1000);
}
客户端部分似乎工作正常。问题出在服务器端。
心跳服务器
public HeartbeatServer(PollableChannel inboundChannel, MessageChannel outboudChannel) {
this.inboundChannel = inboundChannel;
this.outboudChannel = outboudChannel;
}
...
void run() {
// ..in some kind of loop
Message<?> message = inboundChannel.receive(1000); // presumably a blocking call
...
outboudChannel.send(new GenericMessage<>("OK"));
...
}
HeartbeatServerConfig
这是最棘手的部分,我确定我错了。我只是不知道我应该做什么。在这里,我天真地使用客户端实现的逆向方法,它似乎在工作;在流定义中切换入站和出站通道的意义上相反。
...
@Bean
public IntegrationFlow heartbeatServerFlow(
MessageChannel outboundChannel,
PollableChannel inboundChannel) {
return IntegrationFlows
.from(inboundChannel)
.handle(Tcp.inboundGateway(Tcp.netServer(7777)))
.channel(outboundChannel)
.get();
}
...
服务器不工作,抛出关于 Found ambiguous parameter type [class java.lang.Boolean] for method match ...
的神秘异常,后跟一长串 Spring 和 Spring 集成方法。
您无法使用频道启动 server-side 流程。
流程从网关开始;它处理所有套接字通信。当它收到一条消息时,它会将它发送到一个频道。
你可以这样做...
@Bean
public IntegrationFlow server(PollableChannel requests, MessageChannel replies) {
return IntegrationFlows.from(Tcp.inboundGateway(Tcp.netServer(1234))
.replyChannel(replies))
.transform(Transformers.objectToString())
.channel(requests)
.get();
}
但我想问你为什么要这样做,因为现在你必须管理自己的线程以从请求通道接收并写入回复通道。为了使其工作,必须将请求消息 中的 replyChannel
header 复制到回复消息 中。事实上,你真的不需要回复渠道;您可以直接将回复发送到 replyChannel
header(这是内部发生的事情,我们将回复通道桥接到 header 通道)。
在网关的线程上处理请求就简单多了。
为了补充 Gary 的完美答案,如果有人感兴趣,这里是完整的代码。
我必须明确指定 TcpNetServerConnectionFactory
,才能将 ByteArrayLengthHeaderSerializer
设置为 serializer/deserializer。没有它就无法工作。
HeartbeatServerConfig full code
@Bean
public TcpNetServerConnectionFactory connectionFactory() {
TcpNetServerConnectionFactory connectionFactory = new TcpNetServerConnectionFactory(7777);
connectionFactory.setSerializer(new ByteArrayLengthHeaderSerializer());
connectionFactory.setDeserializer(new ByteArrayLengthHeaderSerializer());
return connectionFactory;
}
@Bean
public IntegrationFlow heartbeatServerFlow(
TcpNetServerConnectionFactory connectionFactory,
PollableChannel inboundChannel,
MessageChannel outboundChannel) {
return IntegrationFlows.from(Tcp.inboundGateway(connectionFactory)
.replyChannel(outboundChannel))
.channel(inboundChannel)
.get();
}
心跳服务器 full code
public void start() {
Executors.newSingleThreadExecutor().execute(() -> {
while (true) {
try {
Message<?> request = inboundChannel.receive();
if (request == null) {
log.error("Heartbeat timeouted");
} else {
MessageChannel outboudChannel = (MessageChannel)request.getHeaders().getReplyChannel();
String requestPayload = new String((byte[]) request.getPayload());
if (requestPayload.equals("status")) {
log.info("Heartbeat received");
outboudChannel.send(new GenericMessage<>("OK"));
} else {
log.error("Unexpected message content from client: " + requestPayload);
}
}
} catch (Exception e) {
log.error(e);
}
}
});
}
关键当然是从请求消息本身获取出站通道:MessageChannel outboudChannel = (MessageChannel)request.getHeaders().getReplyChannel()
可在此处找到完整代码。
这是我的问题的变体
到目前为止我能够想出:
HeartbeatClientConfig
...
@Bean
public IntegrationFlow heartbeatClientFlow(
TcpNetClientConnectionFactory clientConnectionFactory,
MessageChannel outboundChannel,
PollableChannel inboundChannel) {
return IntegrationFlows
.from(outboundChannel)
.handle(Tcp.outboundGateway(clientConnectionFactory))
.channel(inboundChannel)
.get();
}
...
HeartbeatClient
public HeartbeatClient(MessageChannel outboudChannel, PollableChannel inboundChannel) {
this.inboundChannel = inboundChannel;
this.outboudChannel = outboudChannel;
}
...
void run() {
// ..in scheduled intervals in loop
outboudChannel.send(new GenericMessage<String>("status"));
Message<?> message = inboundChannel.receive(1000);
}
客户端部分似乎工作正常。问题出在服务器端。
心跳服务器
public HeartbeatServer(PollableChannel inboundChannel, MessageChannel outboudChannel) {
this.inboundChannel = inboundChannel;
this.outboudChannel = outboudChannel;
}
...
void run() {
// ..in some kind of loop
Message<?> message = inboundChannel.receive(1000); // presumably a blocking call
...
outboudChannel.send(new GenericMessage<>("OK"));
...
}
HeartbeatServerConfig
这是最棘手的部分,我确定我错了。我只是不知道我应该做什么。在这里,我天真地使用客户端实现的逆向方法,它似乎在工作;在流定义中切换入站和出站通道的意义上相反。
...
@Bean
public IntegrationFlow heartbeatServerFlow(
MessageChannel outboundChannel,
PollableChannel inboundChannel) {
return IntegrationFlows
.from(inboundChannel)
.handle(Tcp.inboundGateway(Tcp.netServer(7777)))
.channel(outboundChannel)
.get();
}
...
服务器不工作,抛出关于 Found ambiguous parameter type [class java.lang.Boolean] for method match ...
的神秘异常,后跟一长串 Spring 和 Spring 集成方法。
您无法使用频道启动 server-side 流程。
流程从网关开始;它处理所有套接字通信。当它收到一条消息时,它会将它发送到一个频道。
你可以这样做...
@Bean
public IntegrationFlow server(PollableChannel requests, MessageChannel replies) {
return IntegrationFlows.from(Tcp.inboundGateway(Tcp.netServer(1234))
.replyChannel(replies))
.transform(Transformers.objectToString())
.channel(requests)
.get();
}
但我想问你为什么要这样做,因为现在你必须管理自己的线程以从请求通道接收并写入回复通道。为了使其工作,必须将请求消息 中的 replyChannel
header 复制到回复消息 中。事实上,你真的不需要回复渠道;您可以直接将回复发送到 replyChannel
header(这是内部发生的事情,我们将回复通道桥接到 header 通道)。
在网关的线程上处理请求就简单多了。
为了补充 Gary 的完美答案,如果有人感兴趣,这里是完整的代码。
我必须明确指定 TcpNetServerConnectionFactory
,才能将 ByteArrayLengthHeaderSerializer
设置为 serializer/deserializer。没有它就无法工作。
HeartbeatServerConfig full code
@Bean
public TcpNetServerConnectionFactory connectionFactory() {
TcpNetServerConnectionFactory connectionFactory = new TcpNetServerConnectionFactory(7777);
connectionFactory.setSerializer(new ByteArrayLengthHeaderSerializer());
connectionFactory.setDeserializer(new ByteArrayLengthHeaderSerializer());
return connectionFactory;
}
@Bean
public IntegrationFlow heartbeatServerFlow(
TcpNetServerConnectionFactory connectionFactory,
PollableChannel inboundChannel,
MessageChannel outboundChannel) {
return IntegrationFlows.from(Tcp.inboundGateway(connectionFactory)
.replyChannel(outboundChannel))
.channel(inboundChannel)
.get();
}
心跳服务器 full code
public void start() {
Executors.newSingleThreadExecutor().execute(() -> {
while (true) {
try {
Message<?> request = inboundChannel.receive();
if (request == null) {
log.error("Heartbeat timeouted");
} else {
MessageChannel outboudChannel = (MessageChannel)request.getHeaders().getReplyChannel();
String requestPayload = new String((byte[]) request.getPayload());
if (requestPayload.equals("status")) {
log.info("Heartbeat received");
outboudChannel.send(new GenericMessage<>("OK"));
} else {
log.error("Unexpected message content from client: " + requestPayload);
}
}
} catch (Exception e) {
log.error(e);
}
}
});
}
关键当然是从请求消息本身获取出站通道:MessageChannel outboudChannel = (MessageChannel)request.getHeaders().getReplyChannel()
可在此处找到完整代码。