Apache camel sftp org.apache.camel.NoTypeConversionAvailableException: 没有可用于转换的类型转换器 java.io.InputStream
Apache camel sftp org.apache.camel.NoTypeConversionAvailableException: No type converter available to convert java.io.InputStream
目前由于类型转换,我无法从 ftp 服务器下载文件。
所以下面提到的路由是从管道调用的,该管道已经将我的自定义 POJO 设置为 body。自定义 pojo 如下:
public class DirectoryLocationListing {
private String domainName;
private String countryCode;
private String productName;
private String profileId;
private String directoryLocation;
sftp 的路由如下所示,其中我从特定服务器的目录中获取所有文件并将它们保存在本地文件路径中。
<route id="route5">
<from uri="direct:sftpGetCDRs"/>
<from uri="sftp://{{HOST}}//home/admin/ROSS_PULL_CDR/resellers/${body.directoryLocation}/?noop=true&streamDownload=true&username={{USER}}&password={{PASSWD}}"/>
<to uri="file://{{DB_DIR_LOC}}/temp/?fileName=${body.directoryLocation}/&autoCreate=true"/>
</route>
虽然 运行 我的应用程序遇到异常,我觉得我不应该面对下面的异常,因为 from 条目应该获取文件并将其保存在 to 部分,但是 to我放置文件的部分似乎引用了在路由上获取的 body 而不是从 ftp 服务器获取的文件。
---------------------------------------------------------------------------------------------------------------------------------------
RouteId ProcessorId Processor Elapsed (ms)
[_route1 ] [_route1 ] [quartz2://spring?cron=0+*+*+%3F+*+* ] [ 121]
[_route2 ] [_choice1 ] [when[simple{${body.domainName} != null and ${body.countryCode} !=
null] [ 6]
[_route2 ] [_to4 ] [bean:pullCDRProcessorWithFTP?method=createTemporaryDirectoryWithDirLoc ] [ 1]
[_route2 ] [_log6 ] [log ] [ 1]
[_route2 ] [_to4 ] [direct:sftpGetCDRs ] [ 3]
[route5 ] [to1 ] [file://C:/example/export/home/resellerid/wmcdr/cdr/temp/?fileName=${body.director] [ 2]
Exchange
---------------------------------------------------------------------------------------------------------------------------------------
Exchange[
Id ID-IGL70051-59483-1550660613598-0-7
ExchangePattern InOnly
Headers {breadcrumbId=ID-IGL70051-59483-1550660613598-0-1, calendar=null, CamelFileName=null, CamelRedelivered=false, CamelRedeliveryCounter=0, fireTime=Wed Feb 20 16:34:00 IST 2019, jobDetail=JobDetail 'Camel_camelContext-f6f63d02-4688-4b3f-b3ca-b206a61cbe9f.spring': jobClass: 'org.apache.camel.component.quartz2.CamelJob concurrentExectionDisallowed: false persistJobDataAfterExecution: false isDurable: false requestsRecovers: false, jobInstance=org.apache.camel.component.quartz2.CamelJob@65314fe3, jobRunTime=-1, mergedJobDataMap=org.quartz.JobDataMap@f7fbba19, nextFireTime=Wed Feb 20 16:35:00 IST 2019, previousFireTime=null, refireCount=0, result=null, scheduledFireTime=Wed Feb 20 16:34:00 IST 2019, scheduler=org.quartz.impl.StdScheduler@59c9a1bf, trigger=Trigger 'Camel_camelContext-f6f63d02-4688-4b3f-b3ca-b206a61cbe9f.spring': triggerClass: 'org.quartz.impl.triggers.CronTriggerImpl calendar: 'null' misfireInstruction: 1 nextFireTime: Wed Feb 20 16:35:00 IST 2019, triggerGroup=Camel_camelContext-f6f63d02-4688-4b3f-b3ca-b206a61cbe9f, triggerName=spring}
BodyType com.example.cdr.model.DirectoryLocationListing
Body com.example.cdr.model.DirectoryLocationListing@23a2ecf4
]
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
org.apache.camel.component.file.GenericFileOperationFailedException: Cannot store file: C:\EXAMPLE\export\home\resellerid\wmcdr\cdr\temp\IN\healthyindia\apj\SERVICEID
at org.apache.camel.component.file.FileOperations.storeFile(FileOperations.java:292)
at org.apache.camel.component.file.GenericFileProducer.writeFile(GenericFileProducer.java:277)
at org.apache.camel.component.file.GenericFileProducer.processExchange(GenericFileProducer.java:165)
at org.apache.camel.component.file.GenericFileProducer.process(GenericFileProducer.java:79)
at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:129)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:51)
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:129)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:118)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)
at org.apache.camel.processor.ChoiceProcessor.process(ChoiceProcessor.java:111)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:590)
at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:518)
at org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:227)
at org.apache.camel.processor.Splitter.process(Splitter.java:104)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:118)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:118)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:51)
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:129)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:109)
at org.apache.camel.processor.MulticastProcessor.doProcessParallel(MulticastProcessor.java:736)
at org.apache.camel.processor.MulticastProcessor.access0(MulticastProcessor.java:83)
at org.apache.camel.processor.MulticastProcessor.call(MulticastProcessor.java:304)
at org.apache.camel.processor.MulticastProcessor.call(MulticastProcessor.java:289)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.camel.InvalidPayloadException: No body available of type: java.io.InputStream but has value: com.EXAMPLE.cdr.model.DirectoryLocationListing@5a519bb2 of type: com.EXAMPLE.cdr.model.DirectoryLocationListing on: Message: com.EXAMPLE.cdr.model.DirectoryLocationListing@5a519bb2. Caused by: No type converter available to convert from type: com.EXAMPLE.cdr.model.DirectoryLocationListing to the required type: java.io.InputStream with value com.EXAMPLE.cdr.model.DirectoryLocationListing@5a519bb2. Exchange[Message: com.EXAMPLE.cdr.model.DirectoryLocationListing@5a519bb2]. Caused by: [org.apache.camel.NoTypeConversionAvailableException - No type converter available to convert from type: com.EXAMPLE.cdr.model.DirectoryLocationListing to the required type: java.io.InputStream with value com.EXAMPLE.cdr.model.DirectoryLocationListing@5a519bb2]
at org.apache.camel.impl.MessageSupport.getMandatoryBody(MessageSupport.java:101)
at org.apache.camel.component.file.FileOperations.storeFile(FileOperations.java:273)
... 52 more
Caused by: org.apache.camel.NoTypeConversionAvailableException: No type converter available to convert from type: com.EXAMPLE.cdr.model.DirectoryLocationListing to the required type: java.io.InputStream with value com.EXAMPLE.cdr.model.DirectoryLocationListing@5a519bb2
at org.apache.camel.impl.converter.BaseTypeConverterRegistry.mandatoryConvertTo(BaseTypeConverterRegistry.java:177)
at org.apache.camel.impl.MessageSupport.getMandatoryBody(MessageSupport.java:99)
如果有人body遇到同样的问题,请指导。
所以可能需要类型转换,但我认为在这种情况下不需要它。
**更新**
使用 pollenrich 后我的路线如下:
<route id="route5">
<from uri="direct:sftpGetCDRs"/>
<process ref="sftpGetDirLocation"/>
<log message="property ${exchangeProperty.ftpGetDirectory}"/>
<pollEnrich uri="sftp://{{HOST}}//home/admin/ROSS_PULL_CDR/resellers/${exchangeProperty.ftpGetDirectory}/?noop=true&streamDownload=true&username={{USER}}&password={{PASSWD}}" timeout="0"/>
<to uri="file://{{DB_DIR_LOC}}/temp/?fileName=${body.directoryLocation}/&autoCreate=true"/>
</route>
我在这里指的处理器是
@Component("sftpGetDirLocation")
public class SFTPGetDirLocation implements Processor{
public void process(Exchange exchange) throws Exception {
exchange.setProperty("ftpGetDirectory", exchange.getIn().getBody(DirectoryLocationListing.class).getDirectoryLocation());
}
}
然而,我的假设是在这个更改之后 pollEnrich 将保留它从 sftp 的 URI 获得的任何内容(大概是文件)并将其保留为 body。
相反,我看到 body 为空
Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId ProcessorId Processor Elapsed (ms)
[_route1 ] [_route1 ] [quartz2://spring?cron=0+*+*+%3F+*+* ] [ 657]
[_route2 ] [_choice1 ] [when[simple{${body.domainName} != null and ${body.countryCode} !=
null] [ 572]
[_route2 ] [_to4 ] [bean:pullCDRProcessorWithFTP?method=createTemporaryDirectoryWithDirLoc ] [ 2]
[_route2 ] [_log6 ] [log ] [ 7]
[_route2 ] [_to4 ] [direct:sftpGetCDRs ] [ 523]
[route5 ] [process1 ] [ref:sftpGetDirLocation ] [ 0]
[route5 ] [log1 ] [log ] [ 1]
[route5 ] [pollEnrich1 ] [pollEnrich[sftp://10.91.142.11//home/admin/ROSS_PULL_CDR/resellers/${exchangeP] [ 514]
[route5 ] [log2 ] [log ] [ 0]
[route5 ] [to1 ] [file://C:/EXAMPLE/export/home/resellerid/wmcdr/cdr/temp/?fileName=${body.director] [ 0]
Exchange
---------------------------------------------------------------------------------------------------------------------------------------
Exchange[
Id ID-IGL70051-57920-1550721464331-0-4
ExchangePattern InOnly
Headers {breadcrumbId=ID-IGL70051-57920-1550721464331-0-1, calendar=null, CamelFileName=null, CamelRedelivered=false, CamelRedeliveryCounter=0, CamelToEndpoint=sftp://10.91.142.11//home/admin/ROSS_PULL_CDR/resellers/$%7BexchangeProperty.ftpGetDirectory%7D/?noop=true&password=tcpip123&streamDownload=true&username=admin, fireTime=Thu Feb 21 09:28:00 IST 2019, jobDetail=JobDetail 'Camel_camelContext-f6f63d02-4688-4b3f-b3ca-b206a61cbe9f.spring': jobClass: 'org.apache.camel.component.quartz2.CamelJob concurrentExectionDisallowed: false persistJobDataAfterExecution: false isDurable: false requestsRecovers: false, jobInstance=org.apache.camel.component.quartz2.CamelJob@363935fd, jobRunTime=-1, mergedJobDataMap=org.quartz.JobDataMap@f7fbba19, nextFireTime=Thu Feb 21 09:29:00 IST 2019, previousFireTime=null, refireCount=0, result=null, scheduledFireTime=Thu Feb 21 09:28:00 IST 2019, scheduler=org.quartz.impl.StdScheduler@5bf7fd08, trigger=Trigger 'Camel_camelContext-f6f63d02-4688-4b3f-b3ca-b206a61cbe9f.spring': triggerClass: 'org.quartz.impl.triggers.CronTriggerImpl calendar: 'null' misfireInstruction: 1 nextFireTime: Thu Feb 21 09:29:00 IST 2019, triggerGroup=Camel_camelContext-f6f63d02-4688-4b3f-b3ca-b206a61cbe9f, triggerName=spring}
BodyType null
Body [Body is null]
]
因此例外是
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
org.apache.camel.component.file.GenericFileOperationFailedException: Cannot write null body to file: C:\EXAMPLE\export\home\resellerid\wmcdr\cdr\temp
at org.apache.camel.component.file.FileOperations.storeFile(FileOperations.java:205)
我看到代码中存在三个问题。
您正在使用 DirectoryLocationListing
类型的 body 向 route5
发送交换,并且您希望 sftp
组件将该消息与第二个 from
定义。它没有用。当 SFTP
组件下载文件时,交换器上的 body 将被真实文件(org.apache.camel.component.file.GenericFile
类型)替换,您将丢失 DirectoryLocationListing
object 你在之前的交流中。 Content Enriher EIP 是一个不错的选择,如果你的 pojo cam 设置在其他地方,而不是 body.
在当前情况下,在路由的尽头,您希望从 sftp
端点收到文件,但实际交付给您的是与 body 的交换DirectoryLocationListing
类型(您的 SFTP 组件不工作)。 Camel 无法施展魔法并将其保存到磁盘。这正是它所抱怨的。
2.1。一旦你解决了这个问题(并且你的 SFTP 组件开始工作),你将 运行 进入下一个情况,即传递到你的 to
端点的消息现在是一个文件,你需要以某种方式保持 directoryLocation
在交换中直到路线结束。
fileName
定义中的 to
参数指向一个目录。它不会工作。您必须使用动态端点定义
我不熟悉 Camel 的 XML DSL。因此,我将展示如何使用 Java DSL 完成此操作。我将避免使用 SFTP 来代替 brewity,而是使用 file
组件。我们将尝试一次性解决所有这些问题。
路由定义
from("direct:test")
.routeId("route5")
.log("=> body.directoryLocation is: ${body.directoryLocation}") //Just to see if we can read the directoryLocation property from the POJO
.process(exchange -> exchange.setProperty("WriteTargetDirectory",exchange.getIn().getBody(DirectoryListing.class).directoryLocation)) //Move directoryLocation property to an exchange property named WriteTargetDirectory
.pollEnrich()
.simple("file://source/${exchangeProperty.WriteTargetDirectory}/?noop=true")//Poll enrich magic here!
.log("=> I still have the target directory location as : ${exchangeProperty.WriteTargetDirectory}")
.log("=> I just received file [${in.header." + Exchange.FILE_NAME + "}] and will write it as [${exchangeProperty.WriteTargetDirectory}/${in.header." + Exchange.FILE_NAME + "}]" )
.toD("file://destination/${exchangeProperty.WriteTargetDirectory}/");
DirectoryListing
class
public class DirectoryListing {
String directoryLocation;
public DirectoryListing(String directoryLocation) {
this.directoryLocation = directoryLocation;
}
public String getDirectoryLocation() {
return directoryLocation;
}
}
路由测试代码
template.sendBody("direct:test",new DirectoryListing("IN/healthyindia/"));
测试日志
[ main] DefaultCamelContext INFO Apache Camel 2.23.1 (CamelContext: camel-1) started in 0.203 seconds
[ main] route5 INFO => body.directoryLocation is: IN/healthyindia/
[ main] FileEndpoint INFO Endpoint is configured with noop=true so forcing endpoint to be idempotent as well
[ main] FileEndpoint INFO Using default memory based idempotent repository with cache max size: 1000
[ main] route5 INFO => I still have the target directory location as : IN/healthyindia/
[ main] route5 INFO => I just received file [test.txt] and will write it as [destination/IN/healthyindia//test.txt]
[ main] MainRouteBuilderTest INFO ********************************************************************************
[ main] MainRouteBuilderTest INFO Testing done: testRouteBuilder(JustTest.MainRouteBuilderTest)
[ main] MainRouteBuilderTest INFO Took: 0.125 seconds (125 millis)
最后,在 shelldragon 的指导下,以下骆驼上下文路线对我有用:
<route id="route5">
<from id="_from5" uri="direct:sftpGetCDRs"/>
<process id="_process1" ref="sftpGetDirLocation"/>
<log id="_log17" message="property ${exchangeProperty.ftpGetDirectory}"/>
<pollEnrich id="_pollEnrich1" timeout="0" uri="sftp://{{HOST}}//home/admin/ROSS_PULL_CDR/resellers/temp/${body.directoryLocation}/?consumer.delay=60000&username={{USER}}&password={{PASSWD}}"/>
<log id="_log18" message="${body}"/>
<to id="_to2" uri="file://{{DB_DIR_LOC}}/temp/?fileName=${exchangeProperty.ftpGetDirectory}&autoCreate=true"/>
</route>
目前由于类型转换,我无法从 ftp 服务器下载文件。
所以下面提到的路由是从管道调用的,该管道已经将我的自定义 POJO 设置为 body。自定义 pojo 如下:
public class DirectoryLocationListing {
private String domainName;
private String countryCode;
private String productName;
private String profileId;
private String directoryLocation;
sftp 的路由如下所示,其中我从特定服务器的目录中获取所有文件并将它们保存在本地文件路径中。
<route id="route5">
<from uri="direct:sftpGetCDRs"/>
<from uri="sftp://{{HOST}}//home/admin/ROSS_PULL_CDR/resellers/${body.directoryLocation}/?noop=true&streamDownload=true&username={{USER}}&password={{PASSWD}}"/>
<to uri="file://{{DB_DIR_LOC}}/temp/?fileName=${body.directoryLocation}/&autoCreate=true"/>
</route>
虽然 运行 我的应用程序遇到异常,我觉得我不应该面对下面的异常,因为 from 条目应该获取文件并将其保存在 to 部分,但是 to我放置文件的部分似乎引用了在路由上获取的 body 而不是从 ftp 服务器获取的文件。
---------------------------------------------------------------------------------------------------------------------------------------
RouteId ProcessorId Processor Elapsed (ms)
[_route1 ] [_route1 ] [quartz2://spring?cron=0+*+*+%3F+*+* ] [ 121]
[_route2 ] [_choice1 ] [when[simple{${body.domainName} != null and ${body.countryCode} !=
null] [ 6]
[_route2 ] [_to4 ] [bean:pullCDRProcessorWithFTP?method=createTemporaryDirectoryWithDirLoc ] [ 1]
[_route2 ] [_log6 ] [log ] [ 1]
[_route2 ] [_to4 ] [direct:sftpGetCDRs ] [ 3]
[route5 ] [to1 ] [file://C:/example/export/home/resellerid/wmcdr/cdr/temp/?fileName=${body.director] [ 2]
Exchange
---------------------------------------------------------------------------------------------------------------------------------------
Exchange[
Id ID-IGL70051-59483-1550660613598-0-7
ExchangePattern InOnly
Headers {breadcrumbId=ID-IGL70051-59483-1550660613598-0-1, calendar=null, CamelFileName=null, CamelRedelivered=false, CamelRedeliveryCounter=0, fireTime=Wed Feb 20 16:34:00 IST 2019, jobDetail=JobDetail 'Camel_camelContext-f6f63d02-4688-4b3f-b3ca-b206a61cbe9f.spring': jobClass: 'org.apache.camel.component.quartz2.CamelJob concurrentExectionDisallowed: false persistJobDataAfterExecution: false isDurable: false requestsRecovers: false, jobInstance=org.apache.camel.component.quartz2.CamelJob@65314fe3, jobRunTime=-1, mergedJobDataMap=org.quartz.JobDataMap@f7fbba19, nextFireTime=Wed Feb 20 16:35:00 IST 2019, previousFireTime=null, refireCount=0, result=null, scheduledFireTime=Wed Feb 20 16:34:00 IST 2019, scheduler=org.quartz.impl.StdScheduler@59c9a1bf, trigger=Trigger 'Camel_camelContext-f6f63d02-4688-4b3f-b3ca-b206a61cbe9f.spring': triggerClass: 'org.quartz.impl.triggers.CronTriggerImpl calendar: 'null' misfireInstruction: 1 nextFireTime: Wed Feb 20 16:35:00 IST 2019, triggerGroup=Camel_camelContext-f6f63d02-4688-4b3f-b3ca-b206a61cbe9f, triggerName=spring}
BodyType com.example.cdr.model.DirectoryLocationListing
Body com.example.cdr.model.DirectoryLocationListing@23a2ecf4
]
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
org.apache.camel.component.file.GenericFileOperationFailedException: Cannot store file: C:\EXAMPLE\export\home\resellerid\wmcdr\cdr\temp\IN\healthyindia\apj\SERVICEID
at org.apache.camel.component.file.FileOperations.storeFile(FileOperations.java:292)
at org.apache.camel.component.file.GenericFileProducer.writeFile(GenericFileProducer.java:277)
at org.apache.camel.component.file.GenericFileProducer.processExchange(GenericFileProducer.java:165)
at org.apache.camel.component.file.GenericFileProducer.process(GenericFileProducer.java:79)
at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:129)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:51)
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:129)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:118)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)
at org.apache.camel.processor.ChoiceProcessor.process(ChoiceProcessor.java:111)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:590)
at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:518)
at org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:227)
at org.apache.camel.processor.Splitter.process(Splitter.java:104)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:118)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:118)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:51)
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:129)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:109)
at org.apache.camel.processor.MulticastProcessor.doProcessParallel(MulticastProcessor.java:736)
at org.apache.camel.processor.MulticastProcessor.access0(MulticastProcessor.java:83)
at org.apache.camel.processor.MulticastProcessor.call(MulticastProcessor.java:304)
at org.apache.camel.processor.MulticastProcessor.call(MulticastProcessor.java:289)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.camel.InvalidPayloadException: No body available of type: java.io.InputStream but has value: com.EXAMPLE.cdr.model.DirectoryLocationListing@5a519bb2 of type: com.EXAMPLE.cdr.model.DirectoryLocationListing on: Message: com.EXAMPLE.cdr.model.DirectoryLocationListing@5a519bb2. Caused by: No type converter available to convert from type: com.EXAMPLE.cdr.model.DirectoryLocationListing to the required type: java.io.InputStream with value com.EXAMPLE.cdr.model.DirectoryLocationListing@5a519bb2. Exchange[Message: com.EXAMPLE.cdr.model.DirectoryLocationListing@5a519bb2]. Caused by: [org.apache.camel.NoTypeConversionAvailableException - No type converter available to convert from type: com.EXAMPLE.cdr.model.DirectoryLocationListing to the required type: java.io.InputStream with value com.EXAMPLE.cdr.model.DirectoryLocationListing@5a519bb2]
at org.apache.camel.impl.MessageSupport.getMandatoryBody(MessageSupport.java:101)
at org.apache.camel.component.file.FileOperations.storeFile(FileOperations.java:273)
... 52 more
Caused by: org.apache.camel.NoTypeConversionAvailableException: No type converter available to convert from type: com.EXAMPLE.cdr.model.DirectoryLocationListing to the required type: java.io.InputStream with value com.EXAMPLE.cdr.model.DirectoryLocationListing@5a519bb2
at org.apache.camel.impl.converter.BaseTypeConverterRegistry.mandatoryConvertTo(BaseTypeConverterRegistry.java:177)
at org.apache.camel.impl.MessageSupport.getMandatoryBody(MessageSupport.java:99)
如果有人body遇到同样的问题,请指导。 所以可能需要类型转换,但我认为在这种情况下不需要它。
**更新**
使用 pollenrich 后我的路线如下:
<route id="route5">
<from uri="direct:sftpGetCDRs"/>
<process ref="sftpGetDirLocation"/>
<log message="property ${exchangeProperty.ftpGetDirectory}"/>
<pollEnrich uri="sftp://{{HOST}}//home/admin/ROSS_PULL_CDR/resellers/${exchangeProperty.ftpGetDirectory}/?noop=true&streamDownload=true&username={{USER}}&password={{PASSWD}}" timeout="0"/>
<to uri="file://{{DB_DIR_LOC}}/temp/?fileName=${body.directoryLocation}/&autoCreate=true"/>
</route>
我在这里指的处理器是
@Component("sftpGetDirLocation")
public class SFTPGetDirLocation implements Processor{
public void process(Exchange exchange) throws Exception {
exchange.setProperty("ftpGetDirectory", exchange.getIn().getBody(DirectoryLocationListing.class).getDirectoryLocation());
}
}
然而,我的假设是在这个更改之后 pollEnrich 将保留它从 sftp 的 URI 获得的任何内容(大概是文件)并将其保留为 body。 相反,我看到 body 为空
Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId ProcessorId Processor Elapsed (ms)
[_route1 ] [_route1 ] [quartz2://spring?cron=0+*+*+%3F+*+* ] [ 657]
[_route2 ] [_choice1 ] [when[simple{${body.domainName} != null and ${body.countryCode} !=
null] [ 572]
[_route2 ] [_to4 ] [bean:pullCDRProcessorWithFTP?method=createTemporaryDirectoryWithDirLoc ] [ 2]
[_route2 ] [_log6 ] [log ] [ 7]
[_route2 ] [_to4 ] [direct:sftpGetCDRs ] [ 523]
[route5 ] [process1 ] [ref:sftpGetDirLocation ] [ 0]
[route5 ] [log1 ] [log ] [ 1]
[route5 ] [pollEnrich1 ] [pollEnrich[sftp://10.91.142.11//home/admin/ROSS_PULL_CDR/resellers/${exchangeP] [ 514]
[route5 ] [log2 ] [log ] [ 0]
[route5 ] [to1 ] [file://C:/EXAMPLE/export/home/resellerid/wmcdr/cdr/temp/?fileName=${body.director] [ 0]
Exchange
---------------------------------------------------------------------------------------------------------------------------------------
Exchange[
Id ID-IGL70051-57920-1550721464331-0-4
ExchangePattern InOnly
Headers {breadcrumbId=ID-IGL70051-57920-1550721464331-0-1, calendar=null, CamelFileName=null, CamelRedelivered=false, CamelRedeliveryCounter=0, CamelToEndpoint=sftp://10.91.142.11//home/admin/ROSS_PULL_CDR/resellers/$%7BexchangeProperty.ftpGetDirectory%7D/?noop=true&password=tcpip123&streamDownload=true&username=admin, fireTime=Thu Feb 21 09:28:00 IST 2019, jobDetail=JobDetail 'Camel_camelContext-f6f63d02-4688-4b3f-b3ca-b206a61cbe9f.spring': jobClass: 'org.apache.camel.component.quartz2.CamelJob concurrentExectionDisallowed: false persistJobDataAfterExecution: false isDurable: false requestsRecovers: false, jobInstance=org.apache.camel.component.quartz2.CamelJob@363935fd, jobRunTime=-1, mergedJobDataMap=org.quartz.JobDataMap@f7fbba19, nextFireTime=Thu Feb 21 09:29:00 IST 2019, previousFireTime=null, refireCount=0, result=null, scheduledFireTime=Thu Feb 21 09:28:00 IST 2019, scheduler=org.quartz.impl.StdScheduler@5bf7fd08, trigger=Trigger 'Camel_camelContext-f6f63d02-4688-4b3f-b3ca-b206a61cbe9f.spring': triggerClass: 'org.quartz.impl.triggers.CronTriggerImpl calendar: 'null' misfireInstruction: 1 nextFireTime: Thu Feb 21 09:29:00 IST 2019, triggerGroup=Camel_camelContext-f6f63d02-4688-4b3f-b3ca-b206a61cbe9f, triggerName=spring}
BodyType null
Body [Body is null]
]
因此例外是
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
org.apache.camel.component.file.GenericFileOperationFailedException: Cannot write null body to file: C:\EXAMPLE\export\home\resellerid\wmcdr\cdr\temp
at org.apache.camel.component.file.FileOperations.storeFile(FileOperations.java:205)
我看到代码中存在三个问题。
您正在使用
DirectoryLocationListing
类型的 body 向route5
发送交换,并且您希望sftp
组件将该消息与第二个from
定义。它没有用。当SFTP
组件下载文件时,交换器上的 body 将被真实文件(org.apache.camel.component.file.GenericFile
类型)替换,您将丢失DirectoryLocationListing
object 你在之前的交流中。 Content Enriher EIP 是一个不错的选择,如果你的 pojo cam 设置在其他地方,而不是 body.在当前情况下,在路由的尽头,您希望从
sftp
端点收到文件,但实际交付给您的是与 body 的交换DirectoryLocationListing
类型(您的 SFTP 组件不工作)。 Camel 无法施展魔法并将其保存到磁盘。这正是它所抱怨的。2.1。一旦你解决了这个问题(并且你的 SFTP 组件开始工作),你将 运行 进入下一个情况,即传递到你的
to
端点的消息现在是一个文件,你需要以某种方式保持directoryLocation
在交换中直到路线结束。fileName
定义中的to
参数指向一个目录。它不会工作。您必须使用动态端点定义
我不熟悉 Camel 的 XML DSL。因此,我将展示如何使用 Java DSL 完成此操作。我将避免使用 SFTP 来代替 brewity,而是使用 file
组件。我们将尝试一次性解决所有这些问题。
路由定义
from("direct:test")
.routeId("route5")
.log("=> body.directoryLocation is: ${body.directoryLocation}") //Just to see if we can read the directoryLocation property from the POJO
.process(exchange -> exchange.setProperty("WriteTargetDirectory",exchange.getIn().getBody(DirectoryListing.class).directoryLocation)) //Move directoryLocation property to an exchange property named WriteTargetDirectory
.pollEnrich()
.simple("file://source/${exchangeProperty.WriteTargetDirectory}/?noop=true")//Poll enrich magic here!
.log("=> I still have the target directory location as : ${exchangeProperty.WriteTargetDirectory}")
.log("=> I just received file [${in.header." + Exchange.FILE_NAME + "}] and will write it as [${exchangeProperty.WriteTargetDirectory}/${in.header." + Exchange.FILE_NAME + "}]" )
.toD("file://destination/${exchangeProperty.WriteTargetDirectory}/");
DirectoryListing
class
public class DirectoryListing {
String directoryLocation;
public DirectoryListing(String directoryLocation) {
this.directoryLocation = directoryLocation;
}
public String getDirectoryLocation() {
return directoryLocation;
}
}
路由测试代码
template.sendBody("direct:test",new DirectoryListing("IN/healthyindia/"));
测试日志
[ main] DefaultCamelContext INFO Apache Camel 2.23.1 (CamelContext: camel-1) started in 0.203 seconds
[ main] route5 INFO => body.directoryLocation is: IN/healthyindia/
[ main] FileEndpoint INFO Endpoint is configured with noop=true so forcing endpoint to be idempotent as well
[ main] FileEndpoint INFO Using default memory based idempotent repository with cache max size: 1000
[ main] route5 INFO => I still have the target directory location as : IN/healthyindia/
[ main] route5 INFO => I just received file [test.txt] and will write it as [destination/IN/healthyindia//test.txt]
[ main] MainRouteBuilderTest INFO ********************************************************************************
[ main] MainRouteBuilderTest INFO Testing done: testRouteBuilder(JustTest.MainRouteBuilderTest)
[ main] MainRouteBuilderTest INFO Took: 0.125 seconds (125 millis)
最后,在 shelldragon 的指导下,以下骆驼上下文路线对我有用:
<route id="route5">
<from id="_from5" uri="direct:sftpGetCDRs"/>
<process id="_process1" ref="sftpGetDirLocation"/>
<log id="_log17" message="property ${exchangeProperty.ftpGetDirectory}"/>
<pollEnrich id="_pollEnrich1" timeout="0" uri="sftp://{{HOST}}//home/admin/ROSS_PULL_CDR/resellers/temp/${body.directoryLocation}/?consumer.delay=60000&username={{USER}}&password={{PASSWD}}"/>
<log id="_log18" message="${body}"/>
<to id="_to2" uri="file://{{DB_DIR_LOC}}/temp/?fileName=${exchangeProperty.ftpGetDirectory}&autoCreate=true"/>
</route>