Spring集成3.0版本:根据内容将TCP流消息拆分为多个消息
Spring Integration version 3.0: Splitting TCP stream message into multiple Messages based on content
我正在使用 spring 集成框架连接到一些遗留服务器套接字。
下面是我的客户工厂和适配器:
<int-ip:tcp-connection-factory id="client"
type="client"
host="${tcpServer}"
port="${tcpPort}"
single-use="false"
using-nio="false" />
<int-ip:tcp-inbound-channel-adapter id="inboundServer"
client-mode="true"
channel="inputStream"
error-channel="errorChannel"
retry-interval="${retryInterval}"
connection-factory="client" />
在流到字符串转换器下面:
<int:transformer id="clientBytes2String"
input-channel="inputStream"
output-channel="inputString"
expression="new String(payload)" />
<int:channel id="inputString" />
下面的部分是空的,因为我不确定在这里要实现什么,以便它可以调用我的路由器,路由器会处理它的业务。
我已经尝试使用拆分器,它确实有效,如果流以 "ABCD EFGH WXYZ" 或 "ABCD" 的所需格式出现,但如果流以 "ABCD XXXX EFGH WXYZ" 格式出现,则它会失败。期望的结果是它应该处理 3 条消息和 1 个错误。但它处理了 1 条消息,其余的都被忽略了。
代码如下:
<int:splitter input-channel="inputString"
output-channel="preRouter2"
method="splitMessage"
ref="messageSplitterBean"/>
和MessageSpliterBeanclass如下:
@Splitter
public List<Message<?>> splitMessage( Message<?> message ) {
List<Message<?>> msgFragments = new ArrayList<Message<?>>();
//Let say I am assuming message will be coming ABCD EFGH WXYZ
String str[] = message.getPayload().toString().split(" ");
int counter = 1;
for ( String s : str ) {
Message<String> resultMessage = MessageBuilder.withPayload( s )
.copyHeaders(message.getHeaders())
.build();
msgFragments.add(resultMessage);
}
return msgFragments;
}
下面是我的路由器,它会根据一些表达式发送到相应的频道:
<int:recipient-list-router id="customRouter" input-channel="preRouter2">
<int:recipient channel="input1" selector-expression="payload.toString().startsWith('ABCD')"/>
<int:recipient channel="input2" selector-expression="payload.toString().startsWith('EFGH')"/>
<int:recipient channel="input3" selector-expression="payload.toString().startsWith('WXYZ')"/>
需要您对此的专家意见:关于我做错了什么,或者什么是最好的方法。
来自服务器套接字的输入将是固定长度的数据流,space作为分隔符。每个固定长度我都需要将它们转换成消息并发送到相关频道。
此致。
首先没有理由实现你自己的 Splitter
因为默认的有 delimiters
选项:
我的另一个观点是关于 byte[] -> String
的冗余 <int:transformer>
。 Spring 集成为您提供 ObjectToStringTransformer
开箱即用的功能。
您的问题是 <int:recipient-list-router>
。正如你所说,你的数据可能有问题,你的 router
还没有准备好处理这样的消息,它对你来说失败了。那只是因为 (AbstractMessageRouter
):
else {
throw new MessageDeliveryException(message, "No channel resolved by router '" + this.getComponentName()
+ "' and no 'defaultOutputChannel' defined.");
}
当没有人 selector-expression
接受您的错误消息时会发生这种情况。
在这种情况下,它被发送到 <int-ip:tcp-inbound-channel-adapter>
上的 error-channel="errorChannel"
并且它停止进一步的处理只是因为 MessageDeliveryException
.
如您所见,问题似乎已得到解决,您应该将 default-output-channel
添加到 recipient-list-router
配置中。
我正在使用 spring 集成框架连接到一些遗留服务器套接字。
下面是我的客户工厂和适配器:
<int-ip:tcp-connection-factory id="client"
type="client"
host="${tcpServer}"
port="${tcpPort}"
single-use="false"
using-nio="false" />
<int-ip:tcp-inbound-channel-adapter id="inboundServer"
client-mode="true"
channel="inputStream"
error-channel="errorChannel"
retry-interval="${retryInterval}"
connection-factory="client" />
在流到字符串转换器下面:
<int:transformer id="clientBytes2String"
input-channel="inputStream"
output-channel="inputString"
expression="new String(payload)" />
<int:channel id="inputString" />
下面的部分是空的,因为我不确定在这里要实现什么,以便它可以调用我的路由器,路由器会处理它的业务。
我已经尝试使用拆分器,它确实有效,如果流以 "ABCD EFGH WXYZ" 或 "ABCD" 的所需格式出现,但如果流以 "ABCD XXXX EFGH WXYZ" 格式出现,则它会失败。期望的结果是它应该处理 3 条消息和 1 个错误。但它处理了 1 条消息,其余的都被忽略了。
代码如下:
<int:splitter input-channel="inputString"
output-channel="preRouter2"
method="splitMessage"
ref="messageSplitterBean"/>
和MessageSpliterBeanclass如下:
@Splitter
public List<Message<?>> splitMessage( Message<?> message ) {
List<Message<?>> msgFragments = new ArrayList<Message<?>>();
//Let say I am assuming message will be coming ABCD EFGH WXYZ
String str[] = message.getPayload().toString().split(" ");
int counter = 1;
for ( String s : str ) {
Message<String> resultMessage = MessageBuilder.withPayload( s )
.copyHeaders(message.getHeaders())
.build();
msgFragments.add(resultMessage);
}
return msgFragments;
}
下面是我的路由器,它会根据一些表达式发送到相应的频道:
<int:recipient-list-router id="customRouter" input-channel="preRouter2">
<int:recipient channel="input1" selector-expression="payload.toString().startsWith('ABCD')"/>
<int:recipient channel="input2" selector-expression="payload.toString().startsWith('EFGH')"/>
<int:recipient channel="input3" selector-expression="payload.toString().startsWith('WXYZ')"/>
需要您对此的专家意见:关于我做错了什么,或者什么是最好的方法。
来自服务器套接字的输入将是固定长度的数据流,space作为分隔符。每个固定长度我都需要将它们转换成消息并发送到相关频道。
此致。
首先没有理由实现你自己的 Splitter
因为默认的有 delimiters
选项:
我的另一个观点是关于 byte[] -> String
的冗余 <int:transformer>
。 Spring 集成为您提供 ObjectToStringTransformer
开箱即用的功能。
您的问题是 <int:recipient-list-router>
。正如你所说,你的数据可能有问题,你的 router
还没有准备好处理这样的消息,它对你来说失败了。那只是因为 (AbstractMessageRouter
):
else {
throw new MessageDeliveryException(message, "No channel resolved by router '" + this.getComponentName()
+ "' and no 'defaultOutputChannel' defined.");
}
当没有人 selector-expression
接受您的错误消息时会发生这种情况。
在这种情况下,它被发送到 <int-ip:tcp-inbound-channel-adapter>
上的 error-channel="errorChannel"
并且它停止进一步的处理只是因为 MessageDeliveryException
.
如您所见,问题似乎已得到解决,您应该将 default-output-channel
添加到 recipient-list-router
配置中。