Apache Camel 动态端点

Apache Camel dynamic endpoints

我开始学习 Apache Camel 并遇到了问题。

我需要从文件系统中读取 XML 文件,对其进行解析并将此 XML 中指定的某些文件传输到另一个位置。


这是位于 "C:/Users/JuISe/Desktop/jms" 的 XML 的示例。

<file>
    <from>C:/Users/JuISe/Desktop/from</from>
    <to>C:/Users/JuISe/Desktop/to</to>
</file>

表示从 "C:/Users/JuISe/Desktop/from" 目录到 "C:/Users/JuISe/Desktop/to"

这是我的代码:

public class FileShifter {
    public static void main(String args[]) {
        CamelContext context = new DefaultCamelContext();
        try {
            context.addRoutes(new MyRouteBuilder());
            context.start();
            Thread.sleep(10000);
            context.stop();
        }catch (Exception ex) {
            ex.printStackTrace();
        }
    }
}

class MyRouteBuilder extends RouteBuilder {
    private String from;
    private String to;
    public void configure(){
        from("file:C:/Users/JuISe/Desktop/jms?noop=true")
                .setHeader("from", xpath("file/from/text()").stringResult())
                .setHeader("to", xpath("file/to/text()").stringResult())
                .process(new Processor() {
                    @Override
                    public void process(Exchange exchange) throws Exception {
                        from = exchange.getIn().getHeader("from").toString();
                        to = exchange.getIn().getHeader("to").toString();
                    }
                })
                .pollEnrich("file:" + from)
                .to("file:" + to);
    }
}

没用。 这是日志:

[main] INFO org.apache.camel.impl.converter.DefaultTypeConverter - Loaded 216 type converters
[main] INFO org.apache.camel.impl.DefaultRuntimeEndpointRegistry - Runtime endpoint registry is in extended mode gathering usage statistics of all incoming and outgoing endpoints (cache limit: 1000)
[main] INFO org.apache.camel.impl.DefaultCamelContext - AllowUseOriginalMessage is enabled. If access to the original message is not needed, then its recommended to turn this option off as it may improve performance.
[main] INFO org.apache.camel.impl.DefaultCamelContext - StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
[main] INFO org.apache.camel.component.file.FileEndpoint - Endpoint is configured with noop=true so forcing endpoint to be idempotent as well
[main] INFO org.apache.camel.component.file.FileEndpoint - Using default memory based idempotent repository with cache max size: 1000
[main] INFO org.apache.camel.impl.DefaultCamelContext - Route: route1 started and consuming from: Endpoint[file://C:/Users/JuISe/Desktop/jms?noop=true]
[main] INFO org.apache.camel.impl.DefaultCamelContext - Total 1 routes, of which 1 is started.
[main] INFO org.apache.camel.impl.DefaultCamelContext - Apache Camel 2.16.1 (CamelContext: camel-1) started in 1.033 seconds
[Camel (camel-1) thread #0 - file://C:/Users/JuISe/Desktop/jms] WARN org.apache.camel.component.file.strategy.MarkerFileExclusiveReadLockStrategy - Deleting orphaned lock file: C:\Users\JuISe\Desktop\jms\message.xml.camelLock
[Camel (camel-1) thread #0 - file://C:/Users/JuISe/Desktop/jms] INFO org.apache.camel.builder.xml.XPathBuilder - Created default XPathFactory com.sun.org.apache.xpath.internal.jaxp.XPathFactoryImpl@2308d4c8
[main] INFO org.apache.camel.impl.DefaultCamelContext - Apache Camel 2.16.1 (CamelContext: camel-1) is shutting down
[main] INFO org.apache.camel.impl.DefaultShutdownStrategy - Starting to graceful shutdown 1 routes (timeout 300 seconds)
[Camel (camel-1) thread #2 - ShutdownTask] INFO org.apache.camel.impl.DefaultShutdownStrategy - Waiting as there are still 2 inflight and pending exchanges to complete, timeout in 300 seconds. Inflights per route: [route1 = 2]
[Camel (camel-1) thread #2 - ShutdownTask] INFO org.apache.camel.impl.DefaultShutdownStrategy - Waiting as there are still 2 inflight and pending exchanges to complete, timeout in 299 seconds. Inflights per route: [route1 = 2]
[Camel (camel-1) thread #2 - ShutdownTask] INFO org.apache.camel.impl.DefaultShutdownStrategy - Waiting as there are still 2 inflight and pending exchanges to complete, timeout in 298 seconds. Inflights per route: [route1 = 2]
[Camel (camel-1) thread #2 - ShutdownTask] INFO org.apache.camel.impl.DefaultShutdownStrategy - Waiting as there are still 2 inflight and pending exchanges to complete, timeout in 297 seconds. Inflights per route: [route1 = 2]
[Camel (camel-1) thread #2 - ShutdownTask] INFO org.apache.camel.impl.DefaultShutdownStrategy - Waiting as there are still 2 inflight and pending exchanges to complete, timeout in 296 seconds. Inflights per route: [route1 = 2]
[Camel (camel-1) thread #2 - ShutdownTask] INFO org.apache.camel.impl.DefaultShutdownStrategy - Waiting as there are still 2 inflight and pending exchanges to complete, timeout in 295 seconds. Inflights per route: [route1 = 2]
[Camel (camel-1) thread #2 - ShutdownTask] INFO org.apache.camel.impl.DefaultShutdownStrategy - Waiting as there are still 2 inflight and pending exchanges to complete, timeout in 294 seconds. Inflights per route: [route1 = 2]

感谢您的帮助!

尝试使用带有生产者和消费者模板的 bean,文件端点目录不能是动态的

from("file:/Users/smunirat/apps/destination/jms?noop=true")
            .setHeader("from", xpath("file/from/text()").stringResult())
            .setHeader("to", xpath("file/to/text()").stringResult())
            .process(new Processor() {
                @Override
                public void process(Exchange exchange) throws Exception {
                    from = exchange.getIn().getHeader("from").toString();
                    to = exchange.getIn().getHeader("to").toString();

                    exchange.getOut().setHeader("from", from);
                    exchange.getOut().setHeader("to", to);

                }
            })

            .to("log:Sundar?showAll=true&multiline=true")
            .process(new Processor() {
                @Override
                public void process(Exchange exchange) throws Exception {
                    ConsumerTemplate createConsumerTemplate = exchange.getContext().createConsumerTemplate();
                    ProducerTemplate createProducerTemplate = exchange.getContext().createProducerTemplate();
                    Exchange receive = createConsumerTemplate.receive("file://"+exchange.getIn().getHeader("from"));
                    createProducerTemplate.sendBody("file://"+exchange.getIn().getHeader("to"),receive.getIn().getMandatoryBody());

                }
            })
            .log("Message");

这可能需要稍作调整以更改文件名并从源位置删除原始文件