Spring集成3.0版本:根据内容将TCP流消息拆分为多个消息

Spring Integration version 3.0: Splitting TCP stream message into multiple Messages based on content

我正在使用 spring 集成框架连接到一些遗留服务器套接字。

下面是我的客户工厂和适配器:

<int-ip:tcp-connection-factory id="client"
                               type="client" 
                               host="${tcpServer}" 
                               port="${tcpPort}" 
                               single-use="false"
                               using-nio="false" />

<int-ip:tcp-inbound-channel-adapter id="inboundServer"
                                    client-mode="true"
                                    channel="inputStream" 
                                    error-channel="errorChannel"
                                    retry-interval="${retryInterval}"
                                    connection-factory="client" />

在流到字符串转换器下面:

    <int:transformer id="clientBytes2String" 
                 input-channel="inputStream"
                 output-channel="inputString" 
                 expression="new String(payload)" />

                 <int:channel id="inputString" />

下面的部分是空的,因为我不确定在这里要实现什么,以便它可以调用我的路由器,路由器会处理它的业务。


我已经尝试使用拆分器,它确实有效,如果流以 "ABCD EFGH WXYZ" 或 "ABCD" 的所需格式出现,但如果流以 "ABCD XXXX EFGH WXYZ" 格式出现,则它会失败。期望的结果是它应该处理 3 条消息和 1 个错误。但它处理了 1 条消息,其余的都被忽略了。

代码如下:

 <int:splitter input-channel="inputString"
              output-channel="preRouter2"                  
              method="splitMessage"
              ref="messageSplitterBean"/>

和MessageSpliterBeanclass如下:

@Splitter
public List<Message<?>> splitMessage( Message<?> message ) {

    List<Message<?>> msgFragments = new ArrayList<Message<?>>();

    //Let say I am assuming message will be coming ABCD EFGH WXYZ
    String str[] = message.getPayload().toString().split(" ");
    int counter = 1;
    for ( String s : str ) {

        Message<String> resultMessage = MessageBuilder.withPayload( s )
                .copyHeaders(message.getHeaders())
                .build();
        msgFragments.add(resultMessage);
    }

    return msgFragments;
}

下面是我的路由器,它会根据一些表达式发送到相应的频道:

<int:recipient-list-router id="customRouter" input-channel="preRouter2">
<int:recipient channel="input1" selector-expression="payload.toString().startsWith('ABCD')"/>   
<int:recipient channel="input2" selector-expression="payload.toString().startsWith('EFGH')"/>
<int:recipient channel="input3" selector-expression="payload.toString().startsWith('WXYZ')"/>

需要您对此的专家意见:关于我做错了什么,或者什么是最好的方法。

来自服务器套接字的输入将是固定长度的数据流,space作为分隔符。每个固定长度我都需要将它们转换成消息并发送到相关频道。

此致。

首先没有理由实现你自己的 Splitter 因为默认的有 delimiters 选项:

我的另一个观点是关于 byte[] -> String 的冗余 <int:transformer>。 Spring 集成为您提供 ObjectToStringTransformer 开箱即用的功能。

您的问题是 <int:recipient-list-router>。正如你所说,你的数据可能有问题,你的 router 还没有准备好处理这样的消息,它对你来说失败了。那只是因为 (AbstractMessageRouter):

else {
    throw new MessageDeliveryException(message, "No channel resolved by router '" + this.getComponentName()
                + "' and no 'defaultOutputChannel' defined.");
}

当没有人 selector-expression 接受您的错误消息时会发生这种情况。

在这种情况下,它被发送到 <int-ip:tcp-inbound-channel-adapter> 上的 error-channel="errorChannel" 并且它停止进一步的处理只是因为 MessageDeliveryException.

如您所见,问题似乎已得到解决,您应该将 default-output-channel 添加到 recipient-list-router 配置中。