如何在 Camel 路由交换中从 'in' 读取 InputStream 并将 OutputStream 写入 'out'?
How to read an InputStream from 'in' and write an OutputStream to 'out' in a Camel route's exchange?
通常是这样完成的:
try (InputStream is = ...;
OutputStream os = ...) {
int b;
while ((b = is.read()) != -1) {
// do something with the byte
os.write(b);
}
}
在 RouteBuilder
的 configure()
中,我有以下内容:
from("file:...")
...
to("direct:second")
from("direct:second")
...
.process(exchange -> {
try (InputStream is1 =
new BufferedInputStream(new FileInputStream(exchange.getIn().getBody(File.class));
InputStream is2 = exchange.getIn().getBody(BufferedInputStream.class);
// OutputStream os = ???
){
int b;
while ((b = [is1|is2].read()) != -1) {
System.out.print(b); // works
// now how to obtain the OutputStream, connect it to 'out' and write to it?
}
}
})
.to("direct:third")
from("direct:third")
...
我阅读了有关 getIn()
、getOut()
、Message Translator、transform()
、[=23 的文档、博客、教程和 SO 答案=] 无济于事。
更新: 我也看了src/test/java/org/apache/camel/processor,尤其是StreamCachingInOutTest
。 Processor
只是读取 InputStream
.
后续问题:
是:
exchange.getIn().getBody(BufferedInputStream.class)
同于:
new BufferedInputStream(new FileInputStream(exchange.getIn().getBody(File.class))
如果原来的from(...)
是"file:..."
?
更新
我尝试了以下方法:
try (...;
final OutputStream os = new ByteArrayOutputStream()
){
while (b = is.read() ...) {
...
os.write(b);
}
exchange.getOut().setBody(os, ByteArrayOutputStream.class);
}
结果是:
Caused by: org.apache.camel.InvalidPayloadException:
No body available of type: java.io.InputStream but has value:
of type: org.apache.commons.io.output.ByteArrayOutputStream on: Message[].
Caused by: No type converter available to convert from type:
org.apache.commons.io.output.ByteArrayOutputStream to the required type:
java.io.InputStream with value . Exchange[ID-R05377-1542620554174-0-4].
Caused by: [org.apache.camel.NoTypeConversionAvailableException -
No type converter available to convert from type:
org.apache.commons.io.output.ByteArrayOutputStream to the
required type: java.io.InputStream with value ]
at org.apache.camel.impl.MessageSupport.getMandatoryBody(MessageSupport.java:117)
at org.apache.camel.component.file.FileOperations.storeFile(FileOperations.java:333)
... 17 more
Caused by: org.apache.camel.NoTypeConversionAvailableException:
No type converter available to convert from type:
org.apache.commons.io.output.ByteArrayOutputStream to the required type:
java.io.InputStream with value
at org.apache.camel.impl.converter.BaseTypeConverterRegistry.mandatoryConvertTo(BaseTypeConverterRegistry.java:206)
at org.apache.camel.impl.MessageSupport.getMandatoryBody(MessageSupport.java:115)
... 18 more
1849 [Camel (camel-1) thread #2 - Multicast] ERROR org.apache.camel.processor.DefaultErrorHandler -
Failed delivery for (MessageId: ID-R05377-1542620554174-0-7 on
ExchangeId: ID-R05377-1542620554174-0-4). Exhausted after delivery attempt:
1 caught: org.apache.camel.component.file.GenericFileOperationFailedException:
Cannot store file: <... to(...) file path of direct:third here ...>
更新更新
抛出异常是因为我使用了org.apache.commons.io.output.ByteArrayOutputStream
而不是java.io.ByteArrayOutputStream
。它适用于后者,这似乎也是问题的答案。
查看流组件 (http://camel.apache.org/stream.html)
示例:
// Route messages to the standard output.
from("direct:in")
.to("stream:out");
import java.io.ByteArrayOutputStream;
// DO NOT USE THIS IN CONJUNCTION WITH CAMEL!
//import org.apache.commons.io.output.ByteArrayOutputStream;
...
from(...)
...
.streamCaching()
.process(exchange -> {
...
try (final InputStream is = exchange.getIn().getBody(InputStream.class);
final OutputStream os = new ByteArrayOutputStream(OUTSTREAM_BUFFER)) {
while (b = is.read() ...) {
// do something with the byte
os.write(b);
}
exchange.getOut().setBody(os, OutputStream.class);
}
})
...
通常是这样完成的:
try (InputStream is = ...;
OutputStream os = ...) {
int b;
while ((b = is.read()) != -1) {
// do something with the byte
os.write(b);
}
}
在 RouteBuilder
的 configure()
中,我有以下内容:
from("file:...")
...
to("direct:second")
from("direct:second")
...
.process(exchange -> {
try (InputStream is1 =
new BufferedInputStream(new FileInputStream(exchange.getIn().getBody(File.class));
InputStream is2 = exchange.getIn().getBody(BufferedInputStream.class);
// OutputStream os = ???
){
int b;
while ((b = [is1|is2].read()) != -1) {
System.out.print(b); // works
// now how to obtain the OutputStream, connect it to 'out' and write to it?
}
}
})
.to("direct:third")
from("direct:third")
...
我阅读了有关 getIn()
、getOut()
、Message Translator、transform()
、[=23 的文档、博客、教程和 SO 答案=] 无济于事。
更新: 我也看了src/test/java/org/apache/camel/processor,尤其是StreamCachingInOutTest
。 Processor
只是读取 InputStream
.
后续问题:
是:
exchange.getIn().getBody(BufferedInputStream.class)
同于:
new BufferedInputStream(new FileInputStream(exchange.getIn().getBody(File.class))
如果原来的from(...)
是"file:..."
?
更新
我尝试了以下方法:
try (...;
final OutputStream os = new ByteArrayOutputStream()
){
while (b = is.read() ...) {
...
os.write(b);
}
exchange.getOut().setBody(os, ByteArrayOutputStream.class);
}
结果是:
Caused by: org.apache.camel.InvalidPayloadException:
No body available of type: java.io.InputStream but has value:
of type: org.apache.commons.io.output.ByteArrayOutputStream on: Message[].
Caused by: No type converter available to convert from type:
org.apache.commons.io.output.ByteArrayOutputStream to the required type:
java.io.InputStream with value . Exchange[ID-R05377-1542620554174-0-4].
Caused by: [org.apache.camel.NoTypeConversionAvailableException -
No type converter available to convert from type:
org.apache.commons.io.output.ByteArrayOutputStream to the
required type: java.io.InputStream with value ]
at org.apache.camel.impl.MessageSupport.getMandatoryBody(MessageSupport.java:117)
at org.apache.camel.component.file.FileOperations.storeFile(FileOperations.java:333)
... 17 more
Caused by: org.apache.camel.NoTypeConversionAvailableException:
No type converter available to convert from type:
org.apache.commons.io.output.ByteArrayOutputStream to the required type:
java.io.InputStream with value
at org.apache.camel.impl.converter.BaseTypeConverterRegistry.mandatoryConvertTo(BaseTypeConverterRegistry.java:206)
at org.apache.camel.impl.MessageSupport.getMandatoryBody(MessageSupport.java:115)
... 18 more
1849 [Camel (camel-1) thread #2 - Multicast] ERROR org.apache.camel.processor.DefaultErrorHandler -
Failed delivery for (MessageId: ID-R05377-1542620554174-0-7 on
ExchangeId: ID-R05377-1542620554174-0-4). Exhausted after delivery attempt:
1 caught: org.apache.camel.component.file.GenericFileOperationFailedException:
Cannot store file: <... to(...) file path of direct:third here ...>
更新更新
抛出异常是因为我使用了org.apache.commons.io.output.ByteArrayOutputStream
而不是java.io.ByteArrayOutputStream
。它适用于后者,这似乎也是问题的答案。
查看流组件 (http://camel.apache.org/stream.html) 示例:
// Route messages to the standard output.
from("direct:in")
.to("stream:out");
import java.io.ByteArrayOutputStream;
// DO NOT USE THIS IN CONJUNCTION WITH CAMEL!
//import org.apache.commons.io.output.ByteArrayOutputStream;
...
from(...)
...
.streamCaching()
.process(exchange -> {
...
try (final InputStream is = exchange.getIn().getBody(InputStream.class);
final OutputStream os = new ByteArrayOutputStream(OUTSTREAM_BUFFER)) {
while (b = is.read() ...) {
// do something with the byte
os.write(b);
}
exchange.getOut().setBody(os, OutputStream.class);
}
})
...