如何在 Camel 路由交换中从 'in' 读取 InputStream 并将 OutputStream 写入 'out'?

How to read an InputStream from 'in' and write an OutputStream to 'out' in a Camel route's exchange?

通常是这样完成的:

try (InputStream is = ...;
     OutputStream os = ...) {
  int b;
  while ((b = is.read()) != -1) {
    // do something with the byte
    os.write(b);
  }
}

RouteBuilderconfigure() 中,我有以下内容:

from("file:...")
  ...
  to("direct:second")

from("direct:second")
  ...
  .process(exchange -> {
    try (InputStream is1 = 
           new BufferedInputStream(new FileInputStream(exchange.getIn().getBody(File.class));
         InputStream is2 = exchange.getIn().getBody(BufferedInputStream.class);
         // OutputStream os = ???
    ){
      int b;
      while ((b = [is1|is2].read()) != -1) {
        System.out.print(b); // works

        // now how to obtain the OutputStream, connect it to 'out' and write to it?

      }
    }
  })
  .to("direct:third")

from("direct:third")
  ...

我阅读了有关 getIn()getOut()Message Translatortransform()、[=23 的文档、博客、教程和 SO 答案=] 无济于事。

更新: 我也看了src/test/java/org/apache/camel/processor,尤其是StreamCachingInOutTestProcessor 只是读取 InputStream.

后续问题:

是:

exchange.getIn().getBody(BufferedInputStream.class)

同于:

new BufferedInputStream(new FileInputStream(exchange.getIn().getBody(File.class))

如果原来的from(...)"file:..."?

更新

我尝试了以下方法:

try (...;
     final OutputStream os = new ByteArrayOutputStream()
){
  while (b = is.read() ...) {
    ...
    os.write(b);
  }
  exchange.getOut().setBody(os, ByteArrayOutputStream.class);
}

结果是:

Caused by: org.apache.camel.InvalidPayloadException:
  No body available of type: java.io.InputStream but has value:
    of type: org.apache.commons.io.output.ByteArrayOutputStream on: Message[].
   Caused by: No type converter available to convert from type:
     org.apache.commons.io.output.ByteArrayOutputStream to the required type:
       java.io.InputStream with value . Exchange[ID-R05377-1542620554174-0-4].
     Caused by: [org.apache.camel.NoTypeConversionAvailableException -
       No type converter available to convert from type:
         org.apache.commons.io.output.ByteArrayOutputStream to the
           required type: java.io.InputStream with value ]
at org.apache.camel.impl.MessageSupport.getMandatoryBody(MessageSupport.java:117)
at org.apache.camel.component.file.FileOperations.storeFile(FileOperations.java:333)
... 17 more
Caused by: org.apache.camel.NoTypeConversionAvailableException:
  No type converter available to convert from type:
    org.apache.commons.io.output.ByteArrayOutputStream to the required type:
      java.io.InputStream with value 
at org.apache.camel.impl.converter.BaseTypeConverterRegistry.mandatoryConvertTo(BaseTypeConverterRegistry.java:206)
at org.apache.camel.impl.MessageSupport.getMandatoryBody(MessageSupport.java:115)
... 18 more
1849 [Camel (camel-1) thread #2 - Multicast] ERROR org.apache.camel.processor.DefaultErrorHandler  -
  Failed delivery for (MessageId: ID-R05377-1542620554174-0-7 on
    ExchangeId: ID-R05377-1542620554174-0-4). Exhausted after delivery attempt:
      1 caught: org.apache.camel.component.file.GenericFileOperationFailedException:
        Cannot store file: <... to(...) file path of direct:third here ...>

更新更新

抛出异常是因为我使用了org.apache.commons.io.output.ByteArrayOutputStream而不是java.io.ByteArrayOutputStream。它适用于后者,这似乎也是问题的答案。

查看流组件 (http://camel.apache.org/stream.html) 示例:

// Route messages to the standard output. 
from("direct:in")
.to("stream:out"); 
import java.io.ByteArrayOutputStream;

// DO NOT USE THIS IN CONJUNCTION WITH CAMEL!
//import org.apache.commons.io.output.ByteArrayOutputStream;

...    

    from(...)
      ... 
      .streamCaching()
      .process(exchange -> {
        ...
        try (final InputStream is = exchange.getIn().getBody(InputStream.class);
             final OutputStream os = new ByteArrayOutputStream(OUTSTREAM_BUFFER)) {
          while (b = is.read() ...) {

            // do something with the byte

            os.write(b);
          }
          exchange.getOut().setBody(os, OutputStream.class);
        }
      })
      ...