骆驼消耗用base64编码的大文件
Camel consuming large file encoded with base64
我正在尝试使用 apache camel 读取一个大文件。文件的内容是 base64 编码的,我想对其进行解码。读取基于 ASCII 的大型文本文件没有问题,但是这个...
在这种情况下我不能在行尾拆分它,因为base64行尾不是我数据的行尾(数据是csv)。稍后我想操作数据。
使用以下骆驼路线它可以工作,但因此文件必须存在且未锁定,当路线开始时,否则我会收到 IllegalArgumentException。但我不知道,该文件何时会存在。
编辑:我忘记了:骆驼版本:2.14.1 | java: 1.7
我需要解决我的问题,有人可以帮助我吗?
from("stream:file?fileName=/path/to/file&scanStream=true")
.routeId(ROUTE_ID_BASE64)
.unmarshal().base64()
.setHeader("foo", constant("foo"))
.aggregate(header("foo"), new StringBodyAggregator())
.completionSize(1000)
.completionTimeout(1500)
.to("file:" + this.tempPfad + "?fileName=" + this.tempDateiname + "&charset=utf-8&fileExist=Append");
我也试过:
from("file:" + this.eingabePfad + "?filter=#dateinamenFilter&maxDepth=1&readLock=changed&noop=true&charset=utf-8")
.routeId(ROUTE_ID_BASE64)
.unmarshal().base64()
.setHeader("foo", constant("foo"))
.aggregate(header("foo"), new StringBodyAggregator())
.completionSize(1000)
.completionTimeout(1500)
.to("file:" + this.tempPfad + "?fileName=" + this.tempDateiname + "&charset=utf-8&fileExist=Append");
但随后出现以下错误:
ERROR Failed delivery for (MessageId: ID-smith-52872-1421933301634-0-1 on ExchangeId: ID-smith-52872-1421933301634-0-3). Exhausted after delivery attempt: 2 caught: org.apache.camel.component.file.GenericFileOperationFailedException: Cannot store file: /path/to/temp/tempDatei.csv. Processed by failure processor: FatalFallbackErrorHandler[Channel[DelegateSync[yy.xxxxx.myproject.routes.DateiPolling@5734ea]]]
Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId ProcessorId Processor Elapsed (ms)
[routeBase64 ] [routeBase64 ] [file:///path/to/?charset=utf-8&filter=%23datein] [ 1998]
[routeBase64 ] [unmarshal2 ] [unmarshal[org.apache.camel.model.dataformat.Base64DataFormat@153097e] ] [ 5]
[routeBase64 ] [setHeader1 ] [setHeader[foo] ] [ 0]
[routeBase64 ] [aggregate1 ] [aggregate[header(foo)] ] [ 0]
[routeBase64 ] [to1 ] [file:/path/to/temp/?fileName=tempDatei.csv&char] [ 24]
[ ] [process1 ] [yy.xxxxx.myproject.routes.DateiPolling@5734ea ] [ 15]
Exchange
---------------------------------------------------------------------------------------------------------------------------------------
Exchange[
Id ID-smith-52872-1421933301634-0-3
ExchangePattern InOnly
Headers {breadcrumbId=ID-smith-52872-1421933301634-0-1, CamelFileAbsolute=true, CamelFileAbsolutePath=/path/to/base64File.txt, CamelFileLastModified=1417777182000, CamelFileLength=267439971, CamelFileName=base64File.txt, CamelFileNameConsumed=base64File.txt, CamelFileNameOnly=base64File.txt, CamelFileParent=/path/to, CamelFilePath=/path/to/base64File.txt, CamelFileRelativePath=base64File.txt, CamelRedelivered=true, CamelRedeliveryCounter=1, CamelRedeliveryMaxCounter=1, foo=foo}
BodyType org.apache.commons.codec.binary.Base64InputStream
Body [Body is instance of java.io.InputStream]
]
堆栈跟踪
---------------------------------------------- ---------------------------------------------- ----------------------------------
org.apache.camel.component.file.GenericFileOperationFailedException:无法存储文件:/path/to/temp/tempDatei.csv
在 org.apache.camel.component.file.FileOperations.storeFile(FileOperations.java:276)
在 org.apache.camel.component.file.GenericFileProducer.writeFile(GenericFileProducer.java:277)
在 org.apache.camel.component.file.GenericFileProducer.processExchange(GenericFileProducer.java:165)
在 org.apache.camel.component.file.GenericFileProducer.process(GenericFileProducer.java:79)
在 org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
在 org.apache.camel.processor.SendProcessor.process(SendProcessor.java:120)
在 org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:416)
在 org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
在 org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
在 org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:105)
在 org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:87)
在 org.apache.camel.processor.aggregate.AggregateProcessor$1.run(AggregateProcessor.java:548)
在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
在 java.util.concurrent.FutureTask.run(FutureTask.java:262)
在 org.apache.camel.util.concurrent.SynchronousExecutorService.execute(SynchronousExecutorService.java:62)
在 java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:110)
在 org.apache.camel.processor.aggregate.AggregateProcessor.onSubmitCompletion(AggregateProcessor.java:540)
在 org.apache.camel.processor.aggregate.AggregateProcessor.access$900(AggregateProcessor.java:82)
在 org.apache.camel.processor.aggregate.AggregateProcessor$AggregationTimeoutMap.onEviction(AggregateProcessor.java:853)
在 org.apache.camel.processor.aggregate.AggregateProcessor$AggregationTimeoutMap.onEviction(AggregateProcessor.java:814)
在 org.apache.camel.support.DefaultTimeoutMap.purge(DefaultTimeoutMap.java:212)
在 org.apache.camel.processor.aggregate.AggregateProcessor$AggregationTimeoutMap.purge(AggregateProcessor.java:826)
在 org.apache.camel.support.DefaultTimeoutMap.run(DefaultTimeoutMap.java:162)
在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
在 java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
在 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
在 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
在 java.lang.Thread.run(Thread.java:745)
原因:java.io.IOException:流关闭
在 java.io.BufferedReader.ensureOpen(BufferedReader.java:115)
在 java.io.BufferedReader.read(BufferedReader.java:172)
在 org.apache.camel.converter.IOConverter$1.read(IOConverter.java:83)
在 java.io.InputStream.read(InputStream.java:170)
在 java.io.InputStream.read(InputStream.java:101)
在 org.apache.commons.codec.binary.BaseNCodecInputStream.read(BaseNCodecInputStream.java:158)
在 sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:283)
在 sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:325)
在 sun.nio.cs.StreamDecoder.read(StreamDecoder.java:177)
在 java.io.InputStreamReader.read(InputStreamReader.java:184)
在 java.io.BufferedReader.read1(BufferedReader.java:203)
在 java.io.BufferedReader.read(BufferedReader.java:279)
在 java.io.Reader.read(Reader.java:140)
在 org.apache.camel.util.IOHelper.copy(IOHelper.java:224)
在 org.apache.camel.component.file.FileOperations.writeFileByReaderWithCharset(FileOperations.java:402)
在 org.apache.camel.component.file.FileOperations.storeFile(FileOperations.java:266)
... 29 更多
2015-01-22 14:28:26,113 [eTimeoutChecker] AggregateProcessor WARN 处理聚合交换时出错。交换[base64File.txt]。原因:[org.apache.camel.component.file.GenericFileOperationFailedException - 无法存储文件:/path/to/temp/tempDatei.csv]
org.apache.camel.component.file.GenericFileOperationFailedException:无法存储文件:/path/to/temp/tempDatei.csv
在 org.apache.camel.component.file.FileOperations.storeFile(FileOperations.java:276)
在 org.apache.camel.component.file.GenericFileProducer.writeFile(GenericFileProducer.java:277)
在 org.apache.camel.component.file.GenericFileProducer.processExchange(GenericFileProducer.java:165)
在 org.apache.camel.component.file.GenericFileProducer.process(GenericFileProducer.java:79)
在 org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
在 org.apache.camel.processor.SendProcessor.process(SendProcessor.java:120)
在 org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:416)
在 org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
在 org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
在 org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:105)
在 org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:87)
在 org.apache.camel.processor.aggregate.AggregateProcessor$1.run(AggregateProcessor.java:548)
在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
在 java.util.concurrent.FutureTask.run(FutureTask.java:262)
在 org.apache.camel.util.concurrent.SynchronousExecutorService.execute(SynchronousExecutorService.java:62)
在 java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:110)
在 org.apache.camel.processor.aggregate.AggregateProcessor.onSubmitCompletion(AggregateProcessor.java:540)
在 org.apache.camel.processor.aggregate.AggregateProcessor.access$900(AggregateProcessor.java:82)
在 org.apache.camel.processor.aggregate.AggregateProcessor$AggregationTimeoutMap.onEviction(AggregateProcessor.java:853)
在 org.apache.camel.processor.aggregate.AggregateProcessor$AggregationTimeoutMap.onEviction(AggregateProcessor.java:814)
在 org.apache.camel.support.DefaultTimeoutMap.purge(DefaultTimeoutMap.java:212)
在 org.apache.camel.processor.aggregate.AggregateProcessor$AggregationTimeoutMap.purge(AggregateProcessor.java:826)
在 org.apache.camel.support.DefaultTimeoutMap.run(DefaultTimeoutMap.java:162)
在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
在 java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
在 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
在 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
在 java.lang.Thread.run(Thread.java:745)
原因:java.io.IOException:流关闭
在 java.io.BufferedReader.ensureOpen(BufferedReader.java:115)
在 java.io.BufferedReader.read(BufferedReader.java:172)
在 org.apache.camel.converter.IOConverter$1.read(IOConverter.java:83)
在 java.io.InputStream.read(InputStream.java:170)
在 java.io.InputStream.read(InputStream.java:101)
在 org.apache.commons.codec.binary.BaseNCodecInputStream.read(BaseNCodecInputStream.java:158)
在 sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:283)
在 sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:325)
在 sun.nio.cs.StreamDecoder.read(StreamDecoder.java:177)
在 java.io.InputStreamReader.read(InputStreamReader.java:184)
在 java.io.BufferedReader.read1(BufferedReader.java:203)
在 java.io.BufferedReader.read(BufferedReader.java:279)
在 java.io.Reader.read(Reader.java:140)
在 org.apache.camel.util.IOHelper.copy(IOHelper.java:224)
在 org.apache.camel.component.file.FileOperations.writeFileByReaderWithCharset(FileOperations.java:402)
在 org.apache.camel.component.file.FileOperations.storeFile(FileOperations.java:266)
... 29 更多
添加行
.split().tokenize(LINE_SEPARATOR).streaming()</pre>
现在可以使用了。重要的是 .end()</pre>
在路线的尽头,而不是在解组之后直接。
我还将 Aggregatorstrategy 从 String 更改为 byte[],以防止 UTF-8 编码错误。
完整路线:
from("file:" + this.eingabePfad + "?filter=#dateinamenFilter&maxDepth=1&readLock=changed&noop=true")
.routeId(ROUTE_ID_BASE64)
.split().tokenize(LINE_SEPARATOR).streaming()
.unmarshal().base64()
.setHeader("foo", constant("foo"))
.aggregate(header("foo"), new ByteArrayBodyAggregator())
.completionSize(1000)
.completionTimeout(1500)
.to("file:" + this.tempPfad + "?fileName=" + this.tempDateiname + "&charset=utf-8&fileExist=Append")
.end();
我正在尝试使用 apache camel 读取一个大文件。文件的内容是 base64 编码的,我想对其进行解码。读取基于 ASCII 的大型文本文件没有问题,但是这个...
在这种情况下我不能在行尾拆分它,因为base64行尾不是我数据的行尾(数据是csv)。稍后我想操作数据。
使用以下骆驼路线它可以工作,但因此文件必须存在且未锁定,当路线开始时,否则我会收到 IllegalArgumentException。但我不知道,该文件何时会存在。
编辑:我忘记了:骆驼版本:2.14.1 | java: 1.7
我需要解决我的问题,有人可以帮助我吗?
from("stream:file?fileName=/path/to/file&scanStream=true")
.routeId(ROUTE_ID_BASE64)
.unmarshal().base64()
.setHeader("foo", constant("foo"))
.aggregate(header("foo"), new StringBodyAggregator())
.completionSize(1000)
.completionTimeout(1500)
.to("file:" + this.tempPfad + "?fileName=" + this.tempDateiname + "&charset=utf-8&fileExist=Append");
我也试过:
from("file:" + this.eingabePfad + "?filter=#dateinamenFilter&maxDepth=1&readLock=changed&noop=true&charset=utf-8")
.routeId(ROUTE_ID_BASE64)
.unmarshal().base64()
.setHeader("foo", constant("foo"))
.aggregate(header("foo"), new StringBodyAggregator())
.completionSize(1000)
.completionTimeout(1500)
.to("file:" + this.tempPfad + "?fileName=" + this.tempDateiname + "&charset=utf-8&fileExist=Append");
但随后出现以下错误:
ERROR Failed delivery for (MessageId: ID-smith-52872-1421933301634-0-1 on ExchangeId: ID-smith-52872-1421933301634-0-3). Exhausted after delivery attempt: 2 caught: org.apache.camel.component.file.GenericFileOperationFailedException: Cannot store file: /path/to/temp/tempDatei.csv. Processed by failure processor: FatalFallbackErrorHandler[Channel[DelegateSync[yy.xxxxx.myproject.routes.DateiPolling@5734ea]]]
Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId ProcessorId Processor Elapsed (ms)
[routeBase64 ] [routeBase64 ] [file:///path/to/?charset=utf-8&filter=%23datein] [ 1998]
[routeBase64 ] [unmarshal2 ] [unmarshal[org.apache.camel.model.dataformat.Base64DataFormat@153097e] ] [ 5]
[routeBase64 ] [setHeader1 ] [setHeader[foo] ] [ 0]
[routeBase64 ] [aggregate1 ] [aggregate[header(foo)] ] [ 0]
[routeBase64 ] [to1 ] [file:/path/to/temp/?fileName=tempDatei.csv&char] [ 24]
[ ] [process1 ] [yy.xxxxx.myproject.routes.DateiPolling@5734ea ] [ 15]
Exchange
---------------------------------------------------------------------------------------------------------------------------------------
Exchange[
Id ID-smith-52872-1421933301634-0-3
ExchangePattern InOnly
Headers {breadcrumbId=ID-smith-52872-1421933301634-0-1, CamelFileAbsolute=true, CamelFileAbsolutePath=/path/to/base64File.txt, CamelFileLastModified=1417777182000, CamelFileLength=267439971, CamelFileName=base64File.txt, CamelFileNameConsumed=base64File.txt, CamelFileNameOnly=base64File.txt, CamelFileParent=/path/to, CamelFilePath=/path/to/base64File.txt, CamelFileRelativePath=base64File.txt, CamelRedelivered=true, CamelRedeliveryCounter=1, CamelRedeliveryMaxCounter=1, foo=foo}
BodyType org.apache.commons.codec.binary.Base64InputStream
Body [Body is instance of java.io.InputStream]
]
堆栈跟踪
---------------------------------------------- ---------------------------------------------- ----------------------------------
org.apache.camel.component.file.GenericFileOperationFailedException:无法存储文件:/path/to/temp/tempDatei.csv
在 org.apache.camel.component.file.FileOperations.storeFile(FileOperations.java:276)
在 org.apache.camel.component.file.GenericFileProducer.writeFile(GenericFileProducer.java:277)
在 org.apache.camel.component.file.GenericFileProducer.processExchange(GenericFileProducer.java:165)
在 org.apache.camel.component.file.GenericFileProducer.process(GenericFileProducer.java:79)
在 org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
在 org.apache.camel.processor.SendProcessor.process(SendProcessor.java:120)
在 org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:416)
在 org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
在 org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
在 org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:105)
在 org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:87)
在 org.apache.camel.processor.aggregate.AggregateProcessor$1.run(AggregateProcessor.java:548)
在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
在 java.util.concurrent.FutureTask.run(FutureTask.java:262)
在 org.apache.camel.util.concurrent.SynchronousExecutorService.execute(SynchronousExecutorService.java:62)
在 java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:110)
在 org.apache.camel.processor.aggregate.AggregateProcessor.onSubmitCompletion(AggregateProcessor.java:540)
在 org.apache.camel.processor.aggregate.AggregateProcessor.access$900(AggregateProcessor.java:82)
在 org.apache.camel.processor.aggregate.AggregateProcessor$AggregationTimeoutMap.onEviction(AggregateProcessor.java:853)
在 org.apache.camel.processor.aggregate.AggregateProcessor$AggregationTimeoutMap.onEviction(AggregateProcessor.java:814)
在 org.apache.camel.support.DefaultTimeoutMap.purge(DefaultTimeoutMap.java:212)
在 org.apache.camel.processor.aggregate.AggregateProcessor$AggregationTimeoutMap.purge(AggregateProcessor.java:826)
在 org.apache.camel.support.DefaultTimeoutMap.run(DefaultTimeoutMap.java:162)
在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
在 java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
在 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
在 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
在 java.lang.Thread.run(Thread.java:745)
原因:java.io.IOException:流关闭
在 java.io.BufferedReader.ensureOpen(BufferedReader.java:115)
在 java.io.BufferedReader.read(BufferedReader.java:172)
在 org.apache.camel.converter.IOConverter$1.read(IOConverter.java:83)
在 java.io.InputStream.read(InputStream.java:170)
在 java.io.InputStream.read(InputStream.java:101)
在 org.apache.commons.codec.binary.BaseNCodecInputStream.read(BaseNCodecInputStream.java:158)
在 sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:283)
在 sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:325)
在 sun.nio.cs.StreamDecoder.read(StreamDecoder.java:177)
在 java.io.InputStreamReader.read(InputStreamReader.java:184)
在 java.io.BufferedReader.read1(BufferedReader.java:203)
在 java.io.BufferedReader.read(BufferedReader.java:279)
在 java.io.Reader.read(Reader.java:140)
在 org.apache.camel.util.IOHelper.copy(IOHelper.java:224)
在 org.apache.camel.component.file.FileOperations.writeFileByReaderWithCharset(FileOperations.java:402)
在 org.apache.camel.component.file.FileOperations.storeFile(FileOperations.java:266)
... 29 更多
2015-01-22 14:28:26,113 [eTimeoutChecker] AggregateProcessor WARN 处理聚合交换时出错。交换[base64File.txt]。原因:[org.apache.camel.component.file.GenericFileOperationFailedException - 无法存储文件:/path/to/temp/tempDatei.csv]
org.apache.camel.component.file.GenericFileOperationFailedException:无法存储文件:/path/to/temp/tempDatei.csv
在 org.apache.camel.component.file.FileOperations.storeFile(FileOperations.java:276)
在 org.apache.camel.component.file.GenericFileProducer.writeFile(GenericFileProducer.java:277)
在 org.apache.camel.component.file.GenericFileProducer.processExchange(GenericFileProducer.java:165)
在 org.apache.camel.component.file.GenericFileProducer.process(GenericFileProducer.java:79)
在 org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
在 org.apache.camel.processor.SendProcessor.process(SendProcessor.java:120)
在 org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:416)
在 org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
在 org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
在 org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:105)
在 org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:87)
在 org.apache.camel.processor.aggregate.AggregateProcessor$1.run(AggregateProcessor.java:548)
在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
在 java.util.concurrent.FutureTask.run(FutureTask.java:262)
在 org.apache.camel.util.concurrent.SynchronousExecutorService.execute(SynchronousExecutorService.java:62)
在 java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:110)
在 org.apache.camel.processor.aggregate.AggregateProcessor.onSubmitCompletion(AggregateProcessor.java:540)
在 org.apache.camel.processor.aggregate.AggregateProcessor.access$900(AggregateProcessor.java:82)
在 org.apache.camel.processor.aggregate.AggregateProcessor$AggregationTimeoutMap.onEviction(AggregateProcessor.java:853)
在 org.apache.camel.processor.aggregate.AggregateProcessor$AggregationTimeoutMap.onEviction(AggregateProcessor.java:814)
在 org.apache.camel.support.DefaultTimeoutMap.purge(DefaultTimeoutMap.java:212)
在 org.apache.camel.processor.aggregate.AggregateProcessor$AggregationTimeoutMap.purge(AggregateProcessor.java:826)
在 org.apache.camel.support.DefaultTimeoutMap.run(DefaultTimeoutMap.java:162)
在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
在 java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
在 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
在 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
在 java.lang.Thread.run(Thread.java:745)
原因:java.io.IOException:流关闭
在 java.io.BufferedReader.ensureOpen(BufferedReader.java:115)
在 java.io.BufferedReader.read(BufferedReader.java:172)
在 org.apache.camel.converter.IOConverter$1.read(IOConverter.java:83)
在 java.io.InputStream.read(InputStream.java:170)
在 java.io.InputStream.read(InputStream.java:101)
在 org.apache.commons.codec.binary.BaseNCodecInputStream.read(BaseNCodecInputStream.java:158)
在 sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:283)
在 sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:325)
在 sun.nio.cs.StreamDecoder.read(StreamDecoder.java:177)
在 java.io.InputStreamReader.read(InputStreamReader.java:184)
在 java.io.BufferedReader.read1(BufferedReader.java:203)
在 java.io.BufferedReader.read(BufferedReader.java:279)
在 java.io.Reader.read(Reader.java:140)
在 org.apache.camel.util.IOHelper.copy(IOHelper.java:224)
在 org.apache.camel.component.file.FileOperations.writeFileByReaderWithCharset(FileOperations.java:402)
在 org.apache.camel.component.file.FileOperations.storeFile(FileOperations.java:266)
... 29 更多
添加行
.split().tokenize(LINE_SEPARATOR).streaming()</pre>
现在可以使用了。重要的是.end()</pre>
在路线的尽头,而不是在解组之后直接。我还将 Aggregatorstrategy 从 String 更改为 byte[],以防止 UTF-8 编码错误。
完整路线:
from("file:" + this.eingabePfad + "?filter=#dateinamenFilter&maxDepth=1&readLock=changed&noop=true") .routeId(ROUTE_ID_BASE64) .split().tokenize(LINE_SEPARATOR).streaming() .unmarshal().base64() .setHeader("foo", constant("foo")) .aggregate(header("foo"), new ByteArrayBodyAggregator()) .completionSize(1000) .completionTimeout(1500) .to("file:" + this.tempPfad + "?fileName=" + this.tempDateiname + "&charset=utf-8&fileExist=Append") .end();