Apache Camel 动态端点
Apache Camel dynamic endpoints
我开始学习 Apache Camel 并遇到了问题。
我需要从文件系统中读取 XML 文件,对其进行解析并将此 XML 中指定的某些文件传输到另一个位置。
这是位于 "C:/Users/JuISe/Desktop/jms" 的 XML 的示例。
<file>
<from>C:/Users/JuISe/Desktop/from</from>
<to>C:/Users/JuISe/Desktop/to</to>
</file>
表示从
"C:/Users/JuISe/Desktop/from" 目录到 "C:/Users/JuISe/Desktop/to"
这是我的代码:
public class FileShifter {
public static void main(String args[]) {
CamelContext context = new DefaultCamelContext();
try {
context.addRoutes(new MyRouteBuilder());
context.start();
Thread.sleep(10000);
context.stop();
}catch (Exception ex) {
ex.printStackTrace();
}
}
}
class MyRouteBuilder extends RouteBuilder {
private String from;
private String to;
public void configure(){
from("file:C:/Users/JuISe/Desktop/jms?noop=true")
.setHeader("from", xpath("file/from/text()").stringResult())
.setHeader("to", xpath("file/to/text()").stringResult())
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
from = exchange.getIn().getHeader("from").toString();
to = exchange.getIn().getHeader("to").toString();
}
})
.pollEnrich("file:" + from)
.to("file:" + to);
}
}
没用。
这是日志:
[main] INFO org.apache.camel.impl.converter.DefaultTypeConverter - Loaded 216 type converters
[main] INFO org.apache.camel.impl.DefaultRuntimeEndpointRegistry - Runtime endpoint registry is in extended mode gathering usage statistics of all incoming and outgoing endpoints (cache limit: 1000)
[main] INFO org.apache.camel.impl.DefaultCamelContext - AllowUseOriginalMessage is enabled. If access to the original message is not needed, then its recommended to turn this option off as it may improve performance.
[main] INFO org.apache.camel.impl.DefaultCamelContext - StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
[main] INFO org.apache.camel.component.file.FileEndpoint - Endpoint is configured with noop=true so forcing endpoint to be idempotent as well
[main] INFO org.apache.camel.component.file.FileEndpoint - Using default memory based idempotent repository with cache max size: 1000
[main] INFO org.apache.camel.impl.DefaultCamelContext - Route: route1 started and consuming from: Endpoint[file://C:/Users/JuISe/Desktop/jms?noop=true]
[main] INFO org.apache.camel.impl.DefaultCamelContext - Total 1 routes, of which 1 is started.
[main] INFO org.apache.camel.impl.DefaultCamelContext - Apache Camel 2.16.1 (CamelContext: camel-1) started in 1.033 seconds
[Camel (camel-1) thread #0 - file://C:/Users/JuISe/Desktop/jms] WARN org.apache.camel.component.file.strategy.MarkerFileExclusiveReadLockStrategy - Deleting orphaned lock file: C:\Users\JuISe\Desktop\jms\message.xml.camelLock
[Camel (camel-1) thread #0 - file://C:/Users/JuISe/Desktop/jms] INFO org.apache.camel.builder.xml.XPathBuilder - Created default XPathFactory com.sun.org.apache.xpath.internal.jaxp.XPathFactoryImpl@2308d4c8
[main] INFO org.apache.camel.impl.DefaultCamelContext - Apache Camel 2.16.1 (CamelContext: camel-1) is shutting down
[main] INFO org.apache.camel.impl.DefaultShutdownStrategy - Starting to graceful shutdown 1 routes (timeout 300 seconds)
[Camel (camel-1) thread #2 - ShutdownTask] INFO org.apache.camel.impl.DefaultShutdownStrategy - Waiting as there are still 2 inflight and pending exchanges to complete, timeout in 300 seconds. Inflights per route: [route1 = 2]
[Camel (camel-1) thread #2 - ShutdownTask] INFO org.apache.camel.impl.DefaultShutdownStrategy - Waiting as there are still 2 inflight and pending exchanges to complete, timeout in 299 seconds. Inflights per route: [route1 = 2]
[Camel (camel-1) thread #2 - ShutdownTask] INFO org.apache.camel.impl.DefaultShutdownStrategy - Waiting as there are still 2 inflight and pending exchanges to complete, timeout in 298 seconds. Inflights per route: [route1 = 2]
[Camel (camel-1) thread #2 - ShutdownTask] INFO org.apache.camel.impl.DefaultShutdownStrategy - Waiting as there are still 2 inflight and pending exchanges to complete, timeout in 297 seconds. Inflights per route: [route1 = 2]
[Camel (camel-1) thread #2 - ShutdownTask] INFO org.apache.camel.impl.DefaultShutdownStrategy - Waiting as there are still 2 inflight and pending exchanges to complete, timeout in 296 seconds. Inflights per route: [route1 = 2]
[Camel (camel-1) thread #2 - ShutdownTask] INFO org.apache.camel.impl.DefaultShutdownStrategy - Waiting as there are still 2 inflight and pending exchanges to complete, timeout in 295 seconds. Inflights per route: [route1 = 2]
[Camel (camel-1) thread #2 - ShutdownTask] INFO org.apache.camel.impl.DefaultShutdownStrategy - Waiting as there are still 2 inflight and pending exchanges to complete, timeout in 294 seconds. Inflights per route: [route1 = 2]
感谢您的帮助!
尝试使用带有生产者和消费者模板的 bean,文件端点目录不能是动态的
from("file:/Users/smunirat/apps/destination/jms?noop=true")
.setHeader("from", xpath("file/from/text()").stringResult())
.setHeader("to", xpath("file/to/text()").stringResult())
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
from = exchange.getIn().getHeader("from").toString();
to = exchange.getIn().getHeader("to").toString();
exchange.getOut().setHeader("from", from);
exchange.getOut().setHeader("to", to);
}
})
.to("log:Sundar?showAll=true&multiline=true")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
ConsumerTemplate createConsumerTemplate = exchange.getContext().createConsumerTemplate();
ProducerTemplate createProducerTemplate = exchange.getContext().createProducerTemplate();
Exchange receive = createConsumerTemplate.receive("file://"+exchange.getIn().getHeader("from"));
createProducerTemplate.sendBody("file://"+exchange.getIn().getHeader("to"),receive.getIn().getMandatoryBody());
}
})
.log("Message");
这可能需要稍作调整以更改文件名并从源位置删除原始文件
我开始学习 Apache Camel 并遇到了问题。
我需要从文件系统中读取 XML 文件,对其进行解析并将此 XML 中指定的某些文件传输到另一个位置。
这是位于 "C:/Users/JuISe/Desktop/jms" 的 XML 的示例。
<file>
<from>C:/Users/JuISe/Desktop/from</from>
<to>C:/Users/JuISe/Desktop/to</to>
</file>
表示从 "C:/Users/JuISe/Desktop/from" 目录到 "C:/Users/JuISe/Desktop/to"
这是我的代码:
public class FileShifter {
public static void main(String args[]) {
CamelContext context = new DefaultCamelContext();
try {
context.addRoutes(new MyRouteBuilder());
context.start();
Thread.sleep(10000);
context.stop();
}catch (Exception ex) {
ex.printStackTrace();
}
}
}
class MyRouteBuilder extends RouteBuilder {
private String from;
private String to;
public void configure(){
from("file:C:/Users/JuISe/Desktop/jms?noop=true")
.setHeader("from", xpath("file/from/text()").stringResult())
.setHeader("to", xpath("file/to/text()").stringResult())
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
from = exchange.getIn().getHeader("from").toString();
to = exchange.getIn().getHeader("to").toString();
}
})
.pollEnrich("file:" + from)
.to("file:" + to);
}
}
没用。 这是日志:
[main] INFO org.apache.camel.impl.converter.DefaultTypeConverter - Loaded 216 type converters
[main] INFO org.apache.camel.impl.DefaultRuntimeEndpointRegistry - Runtime endpoint registry is in extended mode gathering usage statistics of all incoming and outgoing endpoints (cache limit: 1000)
[main] INFO org.apache.camel.impl.DefaultCamelContext - AllowUseOriginalMessage is enabled. If access to the original message is not needed, then its recommended to turn this option off as it may improve performance.
[main] INFO org.apache.camel.impl.DefaultCamelContext - StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
[main] INFO org.apache.camel.component.file.FileEndpoint - Endpoint is configured with noop=true so forcing endpoint to be idempotent as well
[main] INFO org.apache.camel.component.file.FileEndpoint - Using default memory based idempotent repository with cache max size: 1000
[main] INFO org.apache.camel.impl.DefaultCamelContext - Route: route1 started and consuming from: Endpoint[file://C:/Users/JuISe/Desktop/jms?noop=true]
[main] INFO org.apache.camel.impl.DefaultCamelContext - Total 1 routes, of which 1 is started.
[main] INFO org.apache.camel.impl.DefaultCamelContext - Apache Camel 2.16.1 (CamelContext: camel-1) started in 1.033 seconds
[Camel (camel-1) thread #0 - file://C:/Users/JuISe/Desktop/jms] WARN org.apache.camel.component.file.strategy.MarkerFileExclusiveReadLockStrategy - Deleting orphaned lock file: C:\Users\JuISe\Desktop\jms\message.xml.camelLock
[Camel (camel-1) thread #0 - file://C:/Users/JuISe/Desktop/jms] INFO org.apache.camel.builder.xml.XPathBuilder - Created default XPathFactory com.sun.org.apache.xpath.internal.jaxp.XPathFactoryImpl@2308d4c8
[main] INFO org.apache.camel.impl.DefaultCamelContext - Apache Camel 2.16.1 (CamelContext: camel-1) is shutting down
[main] INFO org.apache.camel.impl.DefaultShutdownStrategy - Starting to graceful shutdown 1 routes (timeout 300 seconds)
[Camel (camel-1) thread #2 - ShutdownTask] INFO org.apache.camel.impl.DefaultShutdownStrategy - Waiting as there are still 2 inflight and pending exchanges to complete, timeout in 300 seconds. Inflights per route: [route1 = 2]
[Camel (camel-1) thread #2 - ShutdownTask] INFO org.apache.camel.impl.DefaultShutdownStrategy - Waiting as there are still 2 inflight and pending exchanges to complete, timeout in 299 seconds. Inflights per route: [route1 = 2]
[Camel (camel-1) thread #2 - ShutdownTask] INFO org.apache.camel.impl.DefaultShutdownStrategy - Waiting as there are still 2 inflight and pending exchanges to complete, timeout in 298 seconds. Inflights per route: [route1 = 2]
[Camel (camel-1) thread #2 - ShutdownTask] INFO org.apache.camel.impl.DefaultShutdownStrategy - Waiting as there are still 2 inflight and pending exchanges to complete, timeout in 297 seconds. Inflights per route: [route1 = 2]
[Camel (camel-1) thread #2 - ShutdownTask] INFO org.apache.camel.impl.DefaultShutdownStrategy - Waiting as there are still 2 inflight and pending exchanges to complete, timeout in 296 seconds. Inflights per route: [route1 = 2]
[Camel (camel-1) thread #2 - ShutdownTask] INFO org.apache.camel.impl.DefaultShutdownStrategy - Waiting as there are still 2 inflight and pending exchanges to complete, timeout in 295 seconds. Inflights per route: [route1 = 2]
[Camel (camel-1) thread #2 - ShutdownTask] INFO org.apache.camel.impl.DefaultShutdownStrategy - Waiting as there are still 2 inflight and pending exchanges to complete, timeout in 294 seconds. Inflights per route: [route1 = 2]
感谢您的帮助!
尝试使用带有生产者和消费者模板的 bean,文件端点目录不能是动态的
from("file:/Users/smunirat/apps/destination/jms?noop=true")
.setHeader("from", xpath("file/from/text()").stringResult())
.setHeader("to", xpath("file/to/text()").stringResult())
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
from = exchange.getIn().getHeader("from").toString();
to = exchange.getIn().getHeader("to").toString();
exchange.getOut().setHeader("from", from);
exchange.getOut().setHeader("to", to);
}
})
.to("log:Sundar?showAll=true&multiline=true")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
ConsumerTemplate createConsumerTemplate = exchange.getContext().createConsumerTemplate();
ProducerTemplate createProducerTemplate = exchange.getContext().createProducerTemplate();
Exchange receive = createConsumerTemplate.receive("file://"+exchange.getIn().getHeader("from"));
createProducerTemplate.sendBody("file://"+exchange.getIn().getHeader("to"),receive.getIn().getMandatoryBody());
}
})
.log("Message");
这可能需要稍作调整以更改文件名并从源位置删除原始文件