骆驼 SFTP 文件处理问题
Camel SFTP file processing issue
我正在尝试使用 camel ftp 组件从 SFTP 服务器获取文件并在自定义中处理它 way.I 我能够成功连接到 sftp 服务器并获取文件但是交换主体包含远程文件对象,文件包含 com.jcraft.jsch.ChannelSftp$LsEntry 对象,当我在调试模式下检查文件对象时,它只包含有关文件的元数据信息。我在将其转换为 file.How 时遇到类型转换异常,我是否从交换对象中获取文件以进行进一步的自定义处理。
当我使用文件而不是 SFTP 时,一切正常。
骆驼路线:
<camelContext id="SourceContext" xmlns="http://camel.apache.org/schema/spring">
<routeContextRef ref="SourceTranslatorRouteContext" />
<threadPoolProfile customId="true"
id="SourceSplitThreadProfile" maxPoolSize="100" maxQueueSize="0"
poolSize="20" />
<route id="SourceOutWriteConsumerRoute" streamCache="true">
<from id="SourceEndpoint"
uri="sftp:{{SourceFtpHostname}}:22/{{directoryName}}?siteCommand=NAMEFMT
1&stepwise=false&fileName={{fileName}}&password={{SourceFtpPWD}}&username={{SourceFtpUname}}&useList=false&delete=true" />
<!-- <from id="SourceEndpoint" uri="file:{{directoryName}}?fileName={{fileName}}"/> -->
<removeHeaders id="_removeHeaders1" pattern="Camel*" />
<doTry id="_doTry1">
<setProperty id="_setProperty1" propertyName="policySublobGroup">
<simple>{{policySublobGroup}}</simple>
</setProperty>
<split id="_split1" parallelProcessing="true" streaming="true">
<method bean="customSplitter" method="splitPolicy" />
</split>
<doCatch id="_doCatch1">
<exception>java.lang.Exception</exception>
<handled>
<constant>true</constant>
</handled>
<
</doCatch>
</doTry>
</route>
</camelContext>
Java自定义处理csv的方法
public static List<List<Policy>> splitPolicy(final Exchange exchange) {
// String line = "";
GenericFile file = exchange.getIn().getBody(GenericFile.class);
InputStream is = null;
BufferedReader br = null;
List<List<Policy>> splitList = new ArrayList<List<Policy>>();
try {
is = new FileInputStream((File) file.getFile());
br = new BufferedReader(new InputStreamReader(is));
br.readLine();
for (String line = br.readLine(); line != null; line = br.readLine()) {
String[] Details = line.split(";");
//logic to add splitList
}
} catch (IOException e) {
LOGGER.error("IOException occured in splitPolicy", e);
} catch (Exception e) {
LOGGER.error("Exception occured in splitPolicy", e);
} finally {
try {
is.close();
br.close();
} catch (IOException e) {
LOGGER.error("Error occured in while closing resource in method splitPolicy ", e);
}
}
return splitList;
}
}
异常:
04:00:03,236 ERROR (Camel (SOURCESYSTEMContext) thread #43 - sftp://SOURCESYSTEM.company.parentcompany:22/home/company/SOURCESYSTEM/TEST/OUT/MOAPP) Exception occured in splitPolicy: java.lang.ClassCastException: com.jcraft.jsch.ChannelSftp$LsEntry cannot be cast to java.io.File
at com.company.esb.SOURCESYSTEM.CustomSplitter.splitPolicy(CustomSplitter.java:51) [classes:]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) [rt.jar:1.8.0_151]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) [rt.jar:1.8.0_151]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) [rt.jar:1.8.0_151]
at java.lang.reflect.Method.invoke(Method.java:498) [rt.jar:1.8.0_151]
at org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:408) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.component.bean.MethodInfo.doProceed(MethodInfo.java:279) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.component.bean.MethodInfo.proceed(MethodInfo.java:252) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:177) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:109) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:68) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.language.bean.BeanExpression$InvokeProcessor.process(BeanExpression.java:211) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.language.bean.BeanExpression.evaluate(BeanExpression.java:126) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.language.bean.BeanExpression.evaluate(BeanExpression.java:138) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.processor.Splitter.createProcessorExchangePairs(Splitter.java:113) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:231) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.processor.Splitter.process(Splitter.java:108) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:196) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:121) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.processor.TryProcessor.process(TryProcessor.java:113) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.processor.TryProcessor.process(TryProcessor.java:84) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:196) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:121) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:109) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:63) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:171) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:454) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.component.file.remote.RemoteFileConsumer.processExchange(RemoteFileConsumer.java:137) [camel-ftp-2.17.0.redhat-630262.jar:2.17.0.redhat-630262]
at org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:226) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:190) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.impl.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:175) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.impl.ScheduledPollConsumer.run(ScheduledPollConsumer.java:102) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [rt.jar:1.8.0_151]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [rt.jar:1.8.0_151]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180) [rt.jar:1.8.0_151]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [rt.jar:1.8.0_151]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [rt.jar:1.8.0_151]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [rt.jar:1.8.0_151]
at java.lang.Thread.run(Thread.java:748) [rt.jar:1.8.0_151]
只需要让 Camel 获取消息体作为输入流,或者 String 等
exchange.getIn().getBody(InputStream.class);
我正在尝试使用 camel ftp 组件从 SFTP 服务器获取文件并在自定义中处理它 way.I 我能够成功连接到 sftp 服务器并获取文件但是交换主体包含远程文件对象,文件包含 com.jcraft.jsch.ChannelSftp$LsEntry 对象,当我在调试模式下检查文件对象时,它只包含有关文件的元数据信息。我在将其转换为 file.How 时遇到类型转换异常,我是否从交换对象中获取文件以进行进一步的自定义处理。
当我使用文件而不是 SFTP 时,一切正常。
骆驼路线:
<camelContext id="SourceContext" xmlns="http://camel.apache.org/schema/spring">
<routeContextRef ref="SourceTranslatorRouteContext" />
<threadPoolProfile customId="true"
id="SourceSplitThreadProfile" maxPoolSize="100" maxQueueSize="0"
poolSize="20" />
<route id="SourceOutWriteConsumerRoute" streamCache="true">
<from id="SourceEndpoint"
uri="sftp:{{SourceFtpHostname}}:22/{{directoryName}}?siteCommand=NAMEFMT
1&stepwise=false&fileName={{fileName}}&password={{SourceFtpPWD}}&username={{SourceFtpUname}}&useList=false&delete=true" />
<!-- <from id="SourceEndpoint" uri="file:{{directoryName}}?fileName={{fileName}}"/> -->
<removeHeaders id="_removeHeaders1" pattern="Camel*" />
<doTry id="_doTry1">
<setProperty id="_setProperty1" propertyName="policySublobGroup">
<simple>{{policySublobGroup}}</simple>
</setProperty>
<split id="_split1" parallelProcessing="true" streaming="true">
<method bean="customSplitter" method="splitPolicy" />
</split>
<doCatch id="_doCatch1">
<exception>java.lang.Exception</exception>
<handled>
<constant>true</constant>
</handled>
<
</doCatch>
</doTry>
</route>
</camelContext>
Java自定义处理csv的方法
public static List<List<Policy>> splitPolicy(final Exchange exchange) {
// String line = "";
GenericFile file = exchange.getIn().getBody(GenericFile.class);
InputStream is = null;
BufferedReader br = null;
List<List<Policy>> splitList = new ArrayList<List<Policy>>();
try {
is = new FileInputStream((File) file.getFile());
br = new BufferedReader(new InputStreamReader(is));
br.readLine();
for (String line = br.readLine(); line != null; line = br.readLine()) {
String[] Details = line.split(";");
//logic to add splitList
}
} catch (IOException e) {
LOGGER.error("IOException occured in splitPolicy", e);
} catch (Exception e) {
LOGGER.error("Exception occured in splitPolicy", e);
} finally {
try {
is.close();
br.close();
} catch (IOException e) {
LOGGER.error("Error occured in while closing resource in method splitPolicy ", e);
}
}
return splitList;
}
}
异常:
04:00:03,236 ERROR (Camel (SOURCESYSTEMContext) thread #43 - sftp://SOURCESYSTEM.company.parentcompany:22/home/company/SOURCESYSTEM/TEST/OUT/MOAPP) Exception occured in splitPolicy: java.lang.ClassCastException: com.jcraft.jsch.ChannelSftp$LsEntry cannot be cast to java.io.File
at com.company.esb.SOURCESYSTEM.CustomSplitter.splitPolicy(CustomSplitter.java:51) [classes:]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) [rt.jar:1.8.0_151]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) [rt.jar:1.8.0_151]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) [rt.jar:1.8.0_151]
at java.lang.reflect.Method.invoke(Method.java:498) [rt.jar:1.8.0_151]
at org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:408) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.component.bean.MethodInfo.doProceed(MethodInfo.java:279) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.component.bean.MethodInfo.proceed(MethodInfo.java:252) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:177) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:109) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:68) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.language.bean.BeanExpression$InvokeProcessor.process(BeanExpression.java:211) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.language.bean.BeanExpression.evaluate(BeanExpression.java:126) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.language.bean.BeanExpression.evaluate(BeanExpression.java:138) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.processor.Splitter.createProcessorExchangePairs(Splitter.java:113) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:231) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.processor.Splitter.process(Splitter.java:108) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:196) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:121) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.processor.TryProcessor.process(TryProcessor.java:113) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.processor.TryProcessor.process(TryProcessor.java:84) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:196) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:121) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:109) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:63) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:171) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:454) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.component.file.remote.RemoteFileConsumer.processExchange(RemoteFileConsumer.java:137) [camel-ftp-2.17.0.redhat-630262.jar:2.17.0.redhat-630262]
at org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:226) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:190) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.impl.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:175) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at org.apache.camel.impl.ScheduledPollConsumer.run(ScheduledPollConsumer.java:102) [camel-core-2.17.0.redhat-630283.jar:2.17.0.redhat-630283]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [rt.jar:1.8.0_151]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [rt.jar:1.8.0_151]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180) [rt.jar:1.8.0_151]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [rt.jar:1.8.0_151]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [rt.jar:1.8.0_151]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [rt.jar:1.8.0_151]
at java.lang.Thread.run(Thread.java:748) [rt.jar:1.8.0_151]
只需要让 Camel 获取消息体作为输入流,或者 String 等
exchange.getIn().getBody(InputStream.class);