Apache camel sftp org.apache.camel.NoTypeConversionAvailableException: 没有可用于转换的类型转换器 java.io.InputStream

Apache camel sftp org.apache.camel.NoTypeConversionAvailableException: No type converter available to convert java.io.InputStream

目前由于类型转换,我无法从 ftp 服务器下载文件。

所以下面提到的路由是从管道调用的,该管道已经将我的自定义 POJO 设置为 body。自定义 pojo 如下:

public class DirectoryLocationListing {

    private String domainName;

    private String countryCode;

    private String productName;

    private String profileId;

    private String directoryLocation;

sftp 的路由如下所示,其中我从特定服务器的目录中获取所有文件并将它们保存在本地文件路径中。

 <route id="route5">
        <from uri="direct:sftpGetCDRs"/>
        <from uri="sftp://{{HOST}}//home/admin/ROSS_PULL_CDR/resellers/${body.directoryLocation}/?noop=true&amp;streamDownload=true&amp;username={{USER}}&amp;password={{PASSWD}}"/>
        <to uri="file://{{DB_DIR_LOC}}/temp/?fileName=${body.directoryLocation}/&amp;autoCreate=true"/>
 </route>

虽然 运行 我的应用程序遇到异常,我觉得我不应该面对下面的异常,因为 from 条目应该获取文件并将其保存在 to 部分,但是 to我放置文件的部分似乎引用了在路由上获取的 body 而不是从 ftp 服务器获取的文件。

---------------------------------------------------------------------------------------------------------------------------------------
RouteId              ProcessorId          Processor                                                                        Elapsed (ms)
[_route1           ] [_route1           ] [quartz2://spring?cron=0+*+*+%3F+*+*                                           ] [       121]
[_route2           ] [_choice1          ] [when[simple{${body.domainName} != null and ${body.countryCode} !=
                                null] [         6]
[_route2           ] [_to4              ] [bean:pullCDRProcessorWithFTP?method=createTemporaryDirectoryWithDirLoc        ] [         1]
[_route2           ] [_log6             ] [log                                                                           ] [         1]
[_route2           ] [_to4              ] [direct:sftpGetCDRs                                                            ] [         3]
[route5            ] [to1               ] [file://C:/example/export/home/resellerid/wmcdr/cdr/temp/?fileName=${body.director] [         2]

Exchange
---------------------------------------------------------------------------------------------------------------------------------------
Exchange[
    Id                  ID-IGL70051-59483-1550660613598-0-7
    ExchangePattern     InOnly
    Headers             {breadcrumbId=ID-IGL70051-59483-1550660613598-0-1, calendar=null, CamelFileName=null, CamelRedelivered=false, CamelRedeliveryCounter=0, fireTime=Wed Feb 20 16:34:00 IST 2019, jobDetail=JobDetail 'Camel_camelContext-f6f63d02-4688-4b3f-b3ca-b206a61cbe9f.spring':  jobClass: 'org.apache.camel.component.quartz2.CamelJob concurrentExectionDisallowed: false persistJobDataAfterExecution: false isDurable: false requestsRecovers: false, jobInstance=org.apache.camel.component.quartz2.CamelJob@65314fe3, jobRunTime=-1, mergedJobDataMap=org.quartz.JobDataMap@f7fbba19, nextFireTime=Wed Feb 20 16:35:00 IST 2019, previousFireTime=null, refireCount=0, result=null, scheduledFireTime=Wed Feb 20 16:34:00 IST 2019, scheduler=org.quartz.impl.StdScheduler@59c9a1bf, trigger=Trigger 'Camel_camelContext-f6f63d02-4688-4b3f-b3ca-b206a61cbe9f.spring':  triggerClass: 'org.quartz.impl.triggers.CronTriggerImpl calendar: 'null' misfireInstruction: 1 nextFireTime: Wed Feb 20 16:35:00 IST 2019, triggerGroup=Camel_camelContext-f6f63d02-4688-4b3f-b3ca-b206a61cbe9f, triggerName=spring}
    BodyType            com.example.cdr.model.DirectoryLocationListing
    Body                com.example.cdr.model.DirectoryLocationListing@23a2ecf4
]

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
org.apache.camel.component.file.GenericFileOperationFailedException: Cannot store file: C:\EXAMPLE\export\home\resellerid\wmcdr\cdr\temp\IN\healthyindia\apj\SERVICEID
    at org.apache.camel.component.file.FileOperations.storeFile(FileOperations.java:292)
    at org.apache.camel.component.file.GenericFileProducer.writeFile(GenericFileProducer.java:277)
    at org.apache.camel.component.file.GenericFileProducer.processExchange(GenericFileProducer.java:165)
    at org.apache.camel.component.file.GenericFileProducer.process(GenericFileProducer.java:79)
    at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:129)
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
    at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:51)
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:129)
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:118)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)
    at org.apache.camel.processor.ChoiceProcessor.process(ChoiceProcessor.java:111)
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
    at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:590)
    at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:518)
    at org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:227)
    at org.apache.camel.processor.Splitter.process(Splitter.java:104)
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:118)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:118)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
    at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:51)
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:129)
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:109)
    at org.apache.camel.processor.MulticastProcessor.doProcessParallel(MulticastProcessor.java:736)
    at org.apache.camel.processor.MulticastProcessor.access0(MulticastProcessor.java:83)
    at org.apache.camel.processor.MulticastProcessor.call(MulticastProcessor.java:304)
    at org.apache.camel.processor.MulticastProcessor.call(MulticastProcessor.java:289)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.camel.InvalidPayloadException: No body available of type: java.io.InputStream but has value: com.EXAMPLE.cdr.model.DirectoryLocationListing@5a519bb2 of type: com.EXAMPLE.cdr.model.DirectoryLocationListing on: Message: com.EXAMPLE.cdr.model.DirectoryLocationListing@5a519bb2. Caused by: No type converter available to convert from type: com.EXAMPLE.cdr.model.DirectoryLocationListing to the required type: java.io.InputStream with value com.EXAMPLE.cdr.model.DirectoryLocationListing@5a519bb2. Exchange[Message: com.EXAMPLE.cdr.model.DirectoryLocationListing@5a519bb2]. Caused by: [org.apache.camel.NoTypeConversionAvailableException - No type converter available to convert from type: com.EXAMPLE.cdr.model.DirectoryLocationListing to the required type: java.io.InputStream with value com.EXAMPLE.cdr.model.DirectoryLocationListing@5a519bb2]
    at org.apache.camel.impl.MessageSupport.getMandatoryBody(MessageSupport.java:101)
    at org.apache.camel.component.file.FileOperations.storeFile(FileOperations.java:273)
    ... 52 more
Caused by: org.apache.camel.NoTypeConversionAvailableException: No type converter available to convert from type: com.EXAMPLE.cdr.model.DirectoryLocationListing to the required type: java.io.InputStream with value com.EXAMPLE.cdr.model.DirectoryLocationListing@5a519bb2
    at org.apache.camel.impl.converter.BaseTypeConverterRegistry.mandatoryConvertTo(BaseTypeConverterRegistry.java:177)
    at org.apache.camel.impl.MessageSupport.getMandatoryBody(MessageSupport.java:99)

如果有人body遇到同样的问题,请指导。 所以可能需要类型转换,但我认为在这种情况下不需要它。

**更新**

使用 pollenrich 后我的路线如下:

 <route id="route5">
        <from uri="direct:sftpGetCDRs"/>
        <process ref="sftpGetDirLocation"/>
        <log message="property ${exchangeProperty.ftpGetDirectory}"/>
        <pollEnrich uri="sftp://{{HOST}}//home/admin/ROSS_PULL_CDR/resellers/${exchangeProperty.ftpGetDirectory}/?noop=true&amp;streamDownload=true&amp;username={{USER}}&amp;password={{PASSWD}}" timeout="0"/>
        <to uri="file://{{DB_DIR_LOC}}/temp/?fileName=${body.directoryLocation}/&amp;autoCreate=true"/>
        </route>

我在这里指的处理器是

@Component("sftpGetDirLocation")
public class SFTPGetDirLocation implements Processor{

    public void process(Exchange exchange) throws Exception {
        exchange.setProperty("ftpGetDirectory", exchange.getIn().getBody(DirectoryLocationListing.class).getDirectoryLocation());
    }

}

然而,我的假设是在这个更改之后 pollEnrich 将保留它从 sftp 的 URI 获得的任何内容(大概是文件)并将其保留为 body。 相反,我看到 body 为空

Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId              ProcessorId          Processor                                                                        Elapsed (ms)
[_route1           ] [_route1           ] [quartz2://spring?cron=0+*+*+%3F+*+*                                           ] [       657]
[_route2           ] [_choice1          ] [when[simple{${body.domainName} != null and ${body.countryCode} !=
                                null] [       572]
[_route2           ] [_to4              ] [bean:pullCDRProcessorWithFTP?method=createTemporaryDirectoryWithDirLoc        ] [         2]
[_route2           ] [_log6             ] [log                                                                           ] [         7]
[_route2           ] [_to4              ] [direct:sftpGetCDRs                                                            ] [       523]
[route5            ] [process1          ] [ref:sftpGetDirLocation                                                        ] [         0]
[route5            ] [log1              ] [log                                                                           ] [         1]
[route5            ] [pollEnrich1       ] [pollEnrich[sftp://10.91.142.11//home/admin/ROSS_PULL_CDR/resellers/${exchangeP] [       514]
[route5            ] [log2              ] [log                                                                           ] [         0]
[route5            ] [to1               ] [file://C:/EXAMPLE/export/home/resellerid/wmcdr/cdr/temp/?fileName=${body.director] [         0]

Exchange
---------------------------------------------------------------------------------------------------------------------------------------
Exchange[
    Id                  ID-IGL70051-57920-1550721464331-0-4
    ExchangePattern     InOnly
    Headers             {breadcrumbId=ID-IGL70051-57920-1550721464331-0-1, calendar=null, CamelFileName=null, CamelRedelivered=false, CamelRedeliveryCounter=0, CamelToEndpoint=sftp://10.91.142.11//home/admin/ROSS_PULL_CDR/resellers/$%7BexchangeProperty.ftpGetDirectory%7D/?noop=true&password=tcpip123&streamDownload=true&username=admin, fireTime=Thu Feb 21 09:28:00 IST 2019, jobDetail=JobDetail 'Camel_camelContext-f6f63d02-4688-4b3f-b3ca-b206a61cbe9f.spring':  jobClass: 'org.apache.camel.component.quartz2.CamelJob concurrentExectionDisallowed: false persistJobDataAfterExecution: false isDurable: false requestsRecovers: false, jobInstance=org.apache.camel.component.quartz2.CamelJob@363935fd, jobRunTime=-1, mergedJobDataMap=org.quartz.JobDataMap@f7fbba19, nextFireTime=Thu Feb 21 09:29:00 IST 2019, previousFireTime=null, refireCount=0, result=null, scheduledFireTime=Thu Feb 21 09:28:00 IST 2019, scheduler=org.quartz.impl.StdScheduler@5bf7fd08, trigger=Trigger 'Camel_camelContext-f6f63d02-4688-4b3f-b3ca-b206a61cbe9f.spring':  triggerClass: 'org.quartz.impl.triggers.CronTriggerImpl calendar: 'null' misfireInstruction: 1 nextFireTime: Thu Feb 21 09:29:00 IST 2019, triggerGroup=Camel_camelContext-f6f63d02-4688-4b3f-b3ca-b206a61cbe9f, triggerName=spring}
    BodyType            null
    Body                [Body is null]
]

因此例外是

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
org.apache.camel.component.file.GenericFileOperationFailedException: Cannot write null body to file: C:\EXAMPLE\export\home\resellerid\wmcdr\cdr\temp
    at org.apache.camel.component.file.FileOperations.storeFile(FileOperations.java:205)

我看到代码中存在三个问题。

  1. 您正在使用 DirectoryLocationListing 类型的 body 向 route5 发送交换,并且您希望 sftp 组件将该消息与第二个 from 定义。它没有用。当 SFTP 组件下载文件时,交换器上的 body 将被真实文件(org.apache.camel.component.file.GenericFile 类型)替换,您将丢失 DirectoryLocationListing object 你在之前的交流中。 Content Enriher EIP 是一个不错的选择,如果你的 pojo cam 设置在其他地方,而不是 body.

  2. 在当前情况下,在路由的尽头,您希望从 sftp 端点收到文件,但实际交付给您的是与 body 的交换DirectoryLocationListing 类型(您的 SFTP 组件不工作)。 Camel 无法施展魔法并将其保存到磁盘。这正是它所抱怨的。

    2.1。一旦你解决了这个问题(并且你的 SFTP 组件开始工作),你将 运行 进入下一个情况,即传递到你的 to 端点的消息现在是一个文件,你需要以某种方式保持 directoryLocation 在交换中直到路线结束。

  3. fileName 定义中的 to 参数指向一个目录。它不会工作。您必须使用动态端点定义

我不熟悉 Camel 的 XML DSL。因此,我将展示如何使用 Java DSL 完成此操作。我将避免使用 SFTP 来代替 brewity,而是使用 file 组件。我们将尝试一次性解决所有这些问题。

路由定义

from("direct:test")
            .routeId("route5")
            .log("=> body.directoryLocation is: ${body.directoryLocation}") //Just to see if we can read the directoryLocation property from the POJO
            .process(exchange -> exchange.setProperty("WriteTargetDirectory",exchange.getIn().getBody(DirectoryListing.class).directoryLocation)) //Move directoryLocation property to an exchange property  named WriteTargetDirectory
            .pollEnrich()
            .simple("file://source/${exchangeProperty.WriteTargetDirectory}/?noop=true")//Poll enrich magic here!
            .log("=> I still have the target directory location as : ${exchangeProperty.WriteTargetDirectory}")
            .log("=> I just received file [${in.header." + Exchange.FILE_NAME + "}] and will write it as [${exchangeProperty.WriteTargetDirectory}/${in.header." + Exchange.FILE_NAME + "}]" )
            .toD("file://destination/${exchangeProperty.WriteTargetDirectory}/");

DirectoryListing class

public class DirectoryListing {
        String directoryLocation;
        public DirectoryListing(String directoryLocation) {
            this.directoryLocation = directoryLocation;
        }
        public String getDirectoryLocation() {
            return directoryLocation;
        }
}

路由测试代码

template.sendBody("direct:test",new DirectoryListing("IN/healthyindia/"));

测试日志

[                          main] DefaultCamelContext            INFO  Apache Camel 2.23.1 (CamelContext: camel-1) started in 0.203 seconds
[                          main] route5                         INFO  => body.directoryLocation is: IN/healthyindia/
[                          main] FileEndpoint                   INFO  Endpoint is configured with noop=true so forcing endpoint to be idempotent as well
[                          main] FileEndpoint                   INFO  Using default memory based idempotent repository with cache max size: 1000
[                          main] route5                         INFO  => I still have the target directory location as : IN/healthyindia/
[                          main] route5                         INFO  => I just received file [test.txt] and will write it as [destination/IN/healthyindia//test.txt]
[                          main] MainRouteBuilderTest           INFO  ********************************************************************************
[                          main] MainRouteBuilderTest           INFO  Testing done: testRouteBuilder(JustTest.MainRouteBuilderTest)
[                          main] MainRouteBuilderTest           INFO  Took: 0.125 seconds (125 millis)

最后,在 shelldragon 的指导下,以下骆驼上下文路线对我有用:

 <route id="route5">
            <from id="_from5" uri="direct:sftpGetCDRs"/>
            <process id="_process1" ref="sftpGetDirLocation"/>
            <log id="_log17" message="property ${exchangeProperty.ftpGetDirectory}"/>
            <pollEnrich id="_pollEnrich1" timeout="0" uri="sftp://{{HOST}}//home/admin/ROSS_PULL_CDR/resellers/temp/${body.directoryLocation}/?consumer.delay=60000&amp;username={{USER}}&amp;password={{PASSWD}}"/>
            <log id="_log18" message="${body}"/>
            <to id="_to2" uri="file://{{DB_DIR_LOC}}/temp/?fileName=${exchangeProperty.ftpGetDirectory}&amp;autoCreate=true"/>
        </route>