MTOM 附件流式传输到通道

MTOM attachment streaming into a channel

我使用 spring 集成 int-ws:outbound-gateway 通过 XOP/MTOM

从网络服务读取文件 (pdf/...)

它可以正常工作并正确地从附件中获取流,我可以将它们写入磁盘。

但是S.I.spring-ws在传输时将文件保存在内存中:将 30 个 60 MB 的文件放在一起会填满内存并失败(即使如果在调用方法中我一次读取带有 bufferedStream 128kB 的 Stream)。

我希望能够传输 N 个文件 “ N * fileSize > availableMemory ”,流式传输更小的块。

xml(和响应object)包含标题等其他信息,"file" 是来自 DataHandler 的 InputStream。

编辑: 这不是 Spring 集成的错。 它也发生在独立的 spring-ws 客户端上。

我的豆子:

<bean id="mtomMarshaller" class="org.springframework.oxm.jaxb.Jaxb2Marshaller">
      <property name="contextPath" value="aaaaa.bb.filestreamer" />
      <property name="mtomEnabled" value="true" />
</bean>

我的 S.I 配置:

<int:logging-channel-adapter id="logger" level="DEBUG" log-full-message="true"/>

<!-- All my channels are like this: -->
<int:channel id="singleFileRespChannel"><int:interceptors><int:wire-tap channel="logger"/></int:interceptors></int:channel>

<int:gateway id="filewsEntry"  
        service-interface="aaaaa.ws.FileServiceGateway"  
        default-reply-timeout="5000"  >
            <int:method name="getSingleFile" request-channel="singleFileReqChannel" reply-channel="singleFileRespChannel" /> 
    </int:gateway>

<int-ws:outbound-gateway id="singleFileMarshallingGateway"    
    request-channel="singleFileReqChannel" reply-channel="singleFileRespChannel" 
    requires-reply="true"   
    uri="${filews.singleFile.wsURI}" marshaller="mtomMarshaller"
    unmarshaller="mtomMarshaller" >
</int-ws:outbound-gateway>  

我也尝试了公理:

 <bean id="axiomMessageFactory" class="org.springframework.ws.soap.axiom.AxiomSoapMessageFactory">
        <property name="payloadCaching" value="false" />
        <property name="attachmentCaching" value="false" />
        <property name="attachmentCacheThreshold" value="1024"/>
        <property name="attachmentCacheDir" value="D:/tmp/cache"></property>
    </bean>

但是,除了将文件缓存在磁盘上之外,其他都是一样的:JVM 始终使用相同的内存(堆)量。

我想到了 S-I。拆分器,这将 return 地图中的块......但要做到这一点,无论如何它都必须读取所有文件。

是否可以通过 spring 集成渠道将结果 "streamed" 传递给调用者,而无需将其全部加载到内存中?

知道如何实现吗?

编辑:添加请求的信息: 在 Junit @Test 中,我在 Runnable.run() (启动 30 个线程)中调用服务来模拟客户端的并发请求。使用的内存增长到最大值 (2gb),这意味着它正在获取内存中的所有文件:

调用者部分代码如下:

try{
        FileResponse sfresp = service.getSingleFile(sfreq); // CALL S.I. GW

        logger.warn("SINGLEFILE STREAM: "+ sfreq.getFile().getFileId());

        InputStream in=new BufferedInputStream( sfresp.getFileAttachment().getFile().getInputStream());

        logger.warn("writing the stream");
        byte[] buffer = new byte[1024];
        int bytesRead;
        try{
             bytesRead = in.read(buffer);
             logger.warn("just read "+bytesRead+" Bytes from file : " + new String(buffer,"ASCII"));

        }catch (IOException e) {
            logger.error("FILE I/O ERROR"+e.getLocalizedMessage());
        }finally{
            in.close();
        }
}catch....etc etc

第一个结果在很多时间(30 秒)后出现,因此端点似乎在将流传回调用方之前获取了所有文件。

编辑

我调试了一会儿 我不明白为什么它会得到所有 60 兆的文件,而不是我请求的几个字节,甚至在我开始读取流之前它就这样做了:

2015-09-28 11:20:32,725|WebServiceTemplate|Sent request [AxiomSoapMessage]
2015-09-28 11:20:32,741|OMOutputFormat|Start getContentType: OMOutputFormat [ mimeBoundary =null rootContentId=null doOptimize=false doingSWA=false isSOAP11=true charSetEncoding=UTF-8 xmlVersion=null contentType=null ignoreXmlDeclaration=false autoCloseWriter=false actionProperty=null optimizedThreshold=0]
2015-09-28 11:20:32,741|OMOutputFormat|getContentType= {text/xml}   OMOutputFormat [ mimeBoundary =null rootContentId=null doOptimize=false doingSWA=false isSOAP11=true charSetEncoding=UTF-8 xmlVersion=null contentType=text/xml ignoreXmlDeclaration=false autoCloseWriter=false actionProperty=null optimizedThreshold=0]
2015-09-28 11:20:32,741|MTOMXMLStreamWriter|Creating MTOMXMLStreamWriter
2015-09-28 11:20:32,741|MTOMXMLStreamWriter|OutputStream =class org.springframework.ws.transport.AbstractSenderConnection$RequestTransportOutputStream
2015-09-28 11:20:32,741|MTOMXMLStreamWriter|OMFormat = OMOutputFormat [ mimeBoundary =null rootContentId=null doOptimize=false doingSWA=false isSOAP11=true charSetEncoding=UTF-8 xmlVersion=null contentType=text/xml ignoreXmlDeclaration=false autoCloseWriter=false actionProperty=null optimizedThreshold=0]
2015-09-28 11:20:32,741|MTOMXMLStreamWriter|preserveAttachments = false
2015-09-28 11:20:32,741|StAXUtils|XMLStreamWriter is org.apache.axiom.util.stax.dialect.Woodstox4StreamWriterWrapper
2015-09-28 11:20:32,741|OMDataSourceExtBase|serialize xmlWriter=org.apache.axiom.om.impl.MTOMXMLStreamWriter@3799c784
2015-09-28 11:20:32,741|MTOMXMLStreamWriter|Returning access to the original output stream: org.springframework.ws.transport.AbstractSenderConnection$RequestTransportOutputStream@50f9c7a0
2015-09-28 11:20:32,741|MTOMXMLStreamWriter|Calling MTOMXMLStreamWriter.flush
2015-09-28 11:20:32,788|OMDataSourceExtBase|serialize OutputStream optimisation: true
2015-09-28 11:20:32,788|OMDataSourceExtBase|serialize output=org.springframework.ws.transport.AbstractSenderConnection$RequestTransportOutputStream@50f9c7a0 format=OMOutputFormat [ mimeBoundary =null rootContentId=null doOptimize=false doingSWA=false isSOAP11=true charSetEncoding=UTF-8 xmlVersion=null contentType=null ignoreXmlDeclaration=false autoCloseWriter=false actionProperty=null optimizedThreshold=0]
2015-09-28 11:20:32,788|ByteArrayDataSource|getXMLBytes encoding=UTF-8
2015-09-28 11:20:32,788|SOAPEnvelopeImpl|Could not close builder or parser due to:
2015-09-28 11:20:32,788|SOAPEnvelopeImpl|builder is null
2015-09-28 11:20:32,788|MTOMXMLStreamWriter|Calling MTOMXMLStreamWriter.flush
2015-09-28 11:20:32,788|MTOMXMLStreamWriter|close
*****
HERE IT STOPS WHILE TRANSFERRING THE DOC OVER NETWORK
*****
2015-09-28 11:20:38,322|MIMEMessage|Attachments contentLength=0, contentTypeString=Multipart/Related; start-info="text/xml"; type="application/xop+xml"; boundary="----=_Part_434_1482047736.1443432076967"
2015-09-28 11:20:38,342|MIMEMessage|getRootPartContentID rootContentID=null
2015-09-28 11:20:38,342|MIMEMessage|readHeaders
2015-09-28 11:20:38,342|MIMEMessage|addHeader: (Content-Type) value=(application/xop+xml; charset=utf-8; type="text/xml")
2015-09-28 11:20:38,342|PartImpl|getHeader name=(content-id) value=(null)
2015-09-28 11:20:38,358|MIMEMessage|getRootPartContentID rootContentID=null
2015-09-28 11:20:38,358|PartImpl|Using blob of type org.apache.axiom.blob.MemoryBlobImpl
2015-09-28 11:20:38,358|PartImpl|getHeader name=(Content-Transfer-Encoding) value=(null)
2015-09-28 11:20:38,358|DebugInputStream|EOF reached after reading 516 bytes in 1 chunks
2015-09-28 11:20:38,358|MIMEMessage|getRootPartContentID rootContentID=null
2015-09-28 11:20:38,358|PartImpl|getHeader name=(content-type) value=(application/xop+xml; charset=utf-8; type="text/xml")
2015-09-28 11:20:38,389|DefaultOMMetaFactoryLocator|Starting class path based discovery
2015-09-28 11:20:38,389|ImplementationFactory|Loading jar:file:/C:/Users/Max/.m2/repository/org/apache/ws/commons/axiom/axiom-impl/1.2.15/axiom-impl-1.2.15.jar!/META-INF/axiom.xml
2015-09-28 11:20:38,389|ImplementationFactory|Discovered implementations: [llom(metaFactory=org.apache.axiom.om.impl.llom.factory.OMLinkedListMetaFactory,features=[default(priority=100)])]
2015-09-28 11:20:38,405|PriorityBasedOMMetaFactoryLocator|Meta factories:
  default: org.apache.axiom.om.impl.llom.factory.OMLinkedListMetaFactory
  2015-09-28 11:20:38,405|StAXSOAPModelBuilder|Starting to process SOAP 1.1 message
2015-09-28 11:20:38,405|WebServiceTemplate|Received response [AxiomSoapMessage] for request [AxiomSoapMessage]
2015-09-28 11:20:38,405|PullSerializer|Pull serializer created; initial state is org.apache.axiom.om.impl.common.serializer.pull.Navigator@41ee0498[cache=false,document=false]
2015-09-28 11:20:38,421|StAXBuilder|Caching disabled; current element level is 3
2015-09-28 11:20:38,421|Navigator|Switching to pull-through mode; first event is START_ELEMENT; depth is 1
2015-09-28 11:20:38,421|PullSerializer|Switching to state org.apache.axiom.om.impl.common.serializer.pull.PullThroughWrapper@35c7d3d3[reader=org.apache.axiom.util.stax.xop.XOPDecodingStreamReader@5223dd3a]
2015-09-28 11:20:38,421|XOPDecodingStreamReader|processXopInclude - found href : cid:e8e49803-f357-4757-bcde-6c99abb48cf4%40ws.xxxx.com
2015-09-28 11:20:38,421|XOPDecodingStreamReader|processXopInclude - decoded contentID : e8e49803-f357-4757-bcde-6c99abb48cf4@ws.xxxx.com
2015-09-28 11:20:38,421|XOPDecodingStreamReader|Encountered xop:Include for content ID 'e8e49803-f357-4757-bcde-6c99abb48cf4@ws.xxxx.com'
2015-09-28 11:20:38,421|MIMEMessage|readHeaders
2015-09-28 11:20:38,421|MIMEMessage|addHeader: (Content-Type) value=(application/octet-stream)
2015-09-28 11:20:38,421|MIMEMessage|addHeader: (Content-ID) value=(<e8e49803-f357-4757-bcde-6c99abb48cf4@ws.xxxx.com>)
2015-09-28 11:20:38,421|MIMEMessage|addHeader: (Content-Transfer-Encoding) value=(binary)
2015-09-28 11:20:38,421|PartImpl|getHeader name=(content-id) value=(<e8e49803-f357-4757-bcde-6c99abb48cf4@ws.xxxx.com>)
2015-09-28 11:20:38,421|PartImpl|Using blob of type org.apache.axiom.blob.MemoryBlobImpl
2015-09-28 11:20:38,421|PartImpl|getHeader name=(Content-Transfer-Encoding) value=(binary)
2015-09-28 11:20:38,499|DebugInputStream|EOF reached after reading 64541873 bytes in 15938 chunks
2015-09-28 11:20:39,499|PullSerializer|Restoring state org.apache.axiom.om.impl.common.serializer.pull.Navigator@41ee0498[cache=false,document=false]
2015-09-28 11:20:39,499|StAXBuilder|Caching re-enabled; new element level: 2; done=false
2015-09-28 11:20:39,500|PullSerializer|Switching to state org.apache.axiom.om.impl.common.serializer.pull.EndDocumentState@1e229be
2015-09-28 11:20:39,501|AbstractReplyProducingMessageHandler|handler 'org.springframework.integration.ws.MarshallingWebServiceOutboundGateway#1' sending reply Message: [Payload=xxxx.ws.filestreamer.FileResponse@306a12cf][Headers={timestamp=1443432039501, id=d9ffd3d4-48f3-504e-00a1-9ccd5d1d56e8, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@15160f3c, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@15160f3c, useStub=false}]
2015-09-28 11:20:39,501|AbstractMessageChannel$ChannelInterceptorList|preSend on channel 'singleFileRespChannel', message: [Payload=xxxx.ws.filestreamer.FileResponse@306a12cf][Headers={timestamp=1443432039501, id=d9ffd3d4-48f3-504e-00a1-9ccd5d1d56e8, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@15160f3c, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@15160f3c, useStub=false}]
2015-09-28 11:20:39,501|AbstractMessageHandler|org.springframework.integration.handler.BridgeHandler@3f8ecde received message: [Payload=xxxx.ws.filestreamer.FileResponse@306a12cf][Headers={timestamp=1443432039501, id=d9ffd3d4-48f3-504e-00a1-9ccd5d1d56e8, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@15160f3c, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@15160f3c, useStub=false}]
2015-09-28 11:20:39,501|AbstractReplyProducingMessageHandler|handler 'org.springframework.integration.handler.BridgeHandler@3f8ecde' sending reply Message: [Payload=xxxx.ws.filestreamer.FileResponse@306a12cf][Headers={timestamp=1443432039501, id=d9ffd3d4-48f3-504e-00a1-9ccd5d1d56e8, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@15160f3c, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@15160f3c, useStub=false}]
2015-09-28 11:20:39,501|AbstractMessageChannel$ChannelInterceptorList|postSend (sent=true) on channel 'singleFileRespChannel', message: [Payload=xxxx.ws.filestreamer.FileResponse@306a12cf][Headers={timestamp=1443432039501, id=d9ffd3d4-48f3-504e-00a1-9ccd5d1d56e8, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@15160f3c, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@15160f3c, useStub=false}]
2015-09-28 11:20:39,501|AbstractMessageChannel$ChannelInterceptorList|postSend (sent=true) on channel 'singleFileReqChannel', message: [Payload=xxxx.ws.filestreamer.FileRequest@1f36f64][Headers={timestamp=1443432032632, id=0ce05572-05f6-5e5e-e819-63377ba4e0b1, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@15160f3c, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@15160f3c}]
2015-09-28 11:20:39,501|IntegrationObjectSupport|Unable to attempt conversion of Message payload types. Component 'filewsEntry' has no explicit ConversionService reference, and there is no 'integrationConversionService' bean within the context.
****
HERE THE SERVICE GATEWAY   RETURNS THE CONTROL TO MY JAVA CODE 
****
2015-09-28 11:20:39,501|TestFileWsInvocation|SINGLEFILE STREAM: "myFile"
2015-09-28 11:20:39,501|TestFileWsInvocation|writing the stream
2015-09-28 11:20:39,501|TestFileWsInvocation|just read 1024 Bytes from file : "1234567.......... "

使用 SAAJ 是一样的:

2015-09-28 11:52:28,760|WebServiceAccessor|Opening [org.springframework.ws.transport.http.HttpUrlConnection@71a8da44] to [http://xxxx:8080/filestreamer/ws/]
2015-09-28 11:52:28,791|AbstractHeaderMapper|headerName=[useStub] WILL NOT be mapped
2015-09-28 11:52:28,807|WebServiceTemplate|Sent request [SaajSoapMessage {http://ws.xxxx.com/filestreamer}fileRequest]
******
STOPS WHILE GETTING ALL THE FILE
******
2015-09-28 11:52:34,948|WebServiceTemplate|Received response [SaajSoapMessage {http://ws.xxxx.com/filestreamer}fileResponse] for request [SaajSoapMessage {http://ws.xxxx.com/filestreamer}fileRequest]
2015-09-28 11:52:35,371|AbstractReplyProducingMessageHandler|handler 'org.springframework.integration.ws.MarshallingWebServiceOutboundGateway#1' sending reply Message: [Payload=com.xxxx.ws.filestreamer.FileResponse@797fc155][Headers={timestamp=1443433955371, id=0a41ab57-dac8-6f7e-3ad9-c71a745f5b0a, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@78561bc6, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@78561bc6, useStub=false}]
2015-09-28 11:52:35,371|AbstractMessageChannel$ChannelInterceptorList|preSend on channel 'singleFileRespChannel', message: [Payload=com.xxxx.ws.filestreamer.FileResponse@797fc155][Headers={timestamp=1443433955371, id=0a41ab57-dac8-6f7e-3ad9-c71a745f5b0a, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@78561bc6, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@78561bc6, useStub=false}]
2015-09-28 11:52:35,371|AbstractMessageHandler|org.springframework.integration.handler.BridgeHandler@326df1c4 received message: [Payload=com.xxxx.ws.filestreamer.FileResponse@797fc155][Headers={timestamp=1443433955371, id=0a41ab57-dac8-6f7e-3ad9-c71a745f5b0a, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@78561bc6, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@78561bc6, useStub=false}]
2015-09-28 11:52:35,371|AbstractReplyProducingMessageHandler|handler 'org.springframework.integration.handler.BridgeHandler@326df1c4' sending reply Message: [Payload=com.xxxx.ws.filestreamer.FileResponse@797fc155][Headers={timestamp=1443433955371, id=0a41ab57-dac8-6f7e-3ad9-c71a745f5b0a, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@78561bc6, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@78561bc6, useStub=false}]
2015-09-28 11:52:35,371|AbstractMessageChannel$ChannelInterceptorList|postSend (sent=true) on channel 'singleFileRespChannel', message: [Payload=com.xxxx.ws.filestreamer.FileResponse@797fc155][Headers={timestamp=1443433955371, id=0a41ab57-dac8-6f7e-3ad9-c71a745f5b0a, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@78561bc6, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@78561bc6, useStub=false}]
2015-09-28 11:52:35,371|AbstractMessageChannel$ChannelInterceptorList|postSend (sent=true) on channel 'singleFileWSReqChannel', message: [Payload=com.xxxx.ws.filestreamer.FileRequest@41216889][Headers={timestamp=1443433948760, id=42d187e8-c0c6-4077-1000-8840e2c531ba, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@78561bc6, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@78561bc6, useStub=false}]
2015-09-28 11:52:35,371|AbstractMessageChannel$ChannelInterceptorList|postSend (sent=true) on channel 'singleFileRouterChannel', message: [Payload=com.xxxx.ws.filestreamer.FileRequest@41216889][Headers={timestamp=1443433948760, id=42d187e8-c0c6-4077-1000-8840e2c531ba, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@78561bc6, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@78561bc6, useStub=false}]
2015-09-28 11:52:35,371|AbstractMessageChannel$ChannelInterceptorList|postSend (sent=true) on channel 'singleFileReqChannel', message: [Payload=com.xxxx.ws.filestreamer.FileRequest@41216889][Headers={timestamp=1443433948760, id=6ebcc8b3-0db4-1e67-9619-98716ff362ff, errorChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@78561bc6, replyChannel=org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel@78561bc6}]
2015-09-28 11:52:35,371|IntegrationObjectSupport|Unable to attempt conversion of Message payload types. Component 'filewsEntry' has no explicit ConversionService reference, and there is no 'integrationConversionService' bean within the context.
2015-09-28 11:52:35,371|TestFileWsInvocation|SINGLEFILE STREAM: 0
2015-09-28 11:52:35,371|TestFileWsInvocation|writing the stream
2015-09-28 11:52:35,371|TestFileWsInvocation|just read 1024 Bytes from file : "12345........"

SAAJ 的问题在于其 API 不支持附件流。另一方面,Apache Axiom 旨在支持这一点,但 Spring-WS 中存在一个设计缺陷,无法利用 Axiom 的功能:

http://veithen.github.io/2015/10/05/spring-ws-mtom.html