select 查询的骆驼路径语法并将数据传输到 postgresql 的插入查询中

camel path syntax for select query and transferring the data into insert query for postgresql

我正在为我的企业应用程序使用 camel 2.15 和 java 1.7。我们的数据库是 postgresql 9.4.0。

我正在尝试为我的要求是使用 select 查询从一个 postgresql 模式读取数据并将相同数据插入另一个 postgre[=37] 的场景配置骆驼路线=] 模式。

我使用以下语法从 postgresql -

中获取数据
from("timer://foo?period="+serverData.getTimer()).threads(serverData.getNumberOfInstances()).setBody(constant(serverData.getSql())).
to("jdbc:" + serverData.getUrl() + serverData.getUserId() + "?outputType=SelectList").
process(custom application logic)

使用上述语法,我可以获取数据并处理成 process 方法中存在的自定义逻辑。

注意 - 在上面的语法中,serverData class 用于获取 postgresql 连接详细信息和 sql 详细信息。

现在,我的问题是 - 如何定义骆驼路线,以便 selected 数据可以通过插入查询直接插入到另一个 postgresql table 中。

我尝试使用以下语法但没有成功。

from("timer://foo?period="+serverData.getTimer()).threads(serverData.getNumberOfInstances()).setBody(constant(serverData.getSql())).
setBody(constant("insert into new_raw_table (numbersubscribers,id) values(:?numbersubscribers,:?id)")).
to("jdbc:" + serverData.getUrl() + serverData.getUserId()).
process(custom application logic)

from("timer://foo?period="+serverData.getTimer()).threads(serverData.getNumberOfInstances()).setBody(constant(serverData.getSql())).
to("jdbc:" + serverData.getUrl() + serverData.getUserId() + "?consumer.onConsume=insert into new_raw_table(NumberSubscribers,ID) values(:#NumberSubscribers,:#ID)");
process(custom application logic)

两种语法都不能将 selected 数据从一个 postgresql 插入到另一个 postgresql 模式。

您有 2 个查询,但只有一个 jdbc 端点。我认为,您需要将另一个 jdbc 端点与另一个数据源一起使用,因为您有另一个数据库。

但这并不能解决单笔交易中两次查询的问题。有关交易客户端的更多信息,请参见此处:http://camel.apache.org/transactional-client.html

更新 2:

from("timer://foo?period="+serverData.getTimer())
  .threads(serverData.getNumberOfInstances())
  .setBody(constant(serverData.getSql()))
  .to("jdbc:" + serverData.getUrl() + serverData.getUserId())
  //Think about query parameters, based on first query result
  //You need to define them, by default the result is returned in the OUT body 
  //ArrayList<HashMap<String, Object>> in the OUT body
   .to("log:before-split?level=INFO&showAll=true&multiline=true")
   .split(body()) //because you received more then one record
   .to("log:after-split?level=INFO&showAll=true&multiline=true")
   .process(new Processor() {
        @Override
        public void process(Exchange exchange) throws Exception {
            //Record of first query after splitter in the Map
            Map<String, Object> record = exchange.getOut().getBody(Map.class);
            exchange.getIn().setHeader("numbersubscribers",  record.get("numbersubscribers"));
            exchange.getIn().setHeader("id",  record.get("id"));
        }
    })
   .to("log:before2query?level=INFO&showAll=true&multiline=true")
   .setBody(constant("insert into new_raw_table (numbersubscribers,values(:?numbersubscribers,:?id)"))

  .to("jdbc:".....) //You have to refer here to another datasource, maybe like below
 //jdbc:embeddedDataSource?allowNamedParameters=true&outputType=SelectOne&useHeadersAsParameters=true&resetAutoCommit=false
  .process(custom application logic);

感谢@Alexey 关于添加日志记录的建议。

这就是我的工作方式。骆驼路线语法将是 -

from("timer://foo?period="+serverData.getTimer()).threads(serverData.getNumberOfInstances()).setBody(constant(serverData.getSql())).
to("jdbc:" + serverData.getUrl() + serverData.getUserId()).
split(body()).process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
       Map<String, Object> record = exchange.getIn().getBody(Map.class);
       exchange.getIn().setHeader("numbersubscribers",  record.get("numbersubscribers"));
       exchange.getIn().setHeader("id",  record.get("id"));
 }
}).
setBody(constant("insert into test_table (numbersubscribers, id) values(:?numbersubscribers, :?id)")).
to("jdbc:" + serverData.getUrl() + serverData.getUserId()+"?allowNamedParameters=true&outputType=SelectOne&useHeadersAsParameters=true&resetAutoCommit=false");}

在我的场景中,select 查询数据以 "InOnly" 交换模式存在,所以当我 exchange.getOut().getBody(Map.class) 获取数据时,它向我抛出 空指针异常。通过日志记录,我发现了交换模式的类型并更正为 exchange.getIn().getBody(Map.class) 并且工作正常。

为了将来参考,我列出了日志堆栈跟踪。

  Exchange[
  , Id: ID-5CG4525D29-58712-1450892474425-0-2
  , ExchangePattern: InOnly
  , Properties: {CamelCreatedTimestamp=Wed Dec 23 23:11:16 IST 2015,        CamelMessageHistory=[DefaultMessageHistory[routeId=route1, node=threads1], DefaultMessageHistory[routeId=route1, node=setBody1], DefaultMessageHistory[routeId=route1, node=to1], DefaultMessageHistory[routeId=route1, node=to2]], CamelTimerCounter=1, CamelTimerFiredTime=Wed Dec 23 23:11:16 IST 2015, CamelTimerName=foo, CamelTimerPeriod=15000, CamelToEndpoint=log://before-split?level=INFO&multiline=true&showAll=true}
  , Headers: {breadcrumbId=ID-5CG4525D29-58712-1450892474425-0-1, CamelJdbcColumnNames=[numbersubscribers, id], CamelJdbcRowCount=2, firedTime=Wed Dec 23 23:11:16 IST 2015}
  , BodyType: java.util.ArrayList
  , Body: [{numbersubscribers=11, id=22}, {numbersubscribers=55, id=66}]

这可能会帮助其他面临类似问题的人。谢谢