select 查询的骆驼路径语法并将数据传输到 postgresql 的插入查询中
camel path syntax for select query and transferring the data into insert query for postgresql
我正在为我的企业应用程序使用 camel 2.15 和 java 1.7。我们的数据库是 postgresql 9.4.0。
我正在尝试为我的要求是使用 select 查询从一个 postgresql 模式读取数据并将相同数据插入另一个 postgre[=37] 的场景配置骆驼路线=] 模式。
我使用以下语法从 postgresql -
中获取数据
from("timer://foo?period="+serverData.getTimer()).threads(serverData.getNumberOfInstances()).setBody(constant(serverData.getSql())).
to("jdbc:" + serverData.getUrl() + serverData.getUserId() + "?outputType=SelectList").
process(custom application logic)
使用上述语法,我可以获取数据并处理成 process
方法中存在的自定义逻辑。
注意 - 在上面的语法中,serverData
class 用于获取 postgresql 连接详细信息和 sql 详细信息。
现在,我的问题是 - 如何定义骆驼路线,以便 selected 数据可以通过插入查询直接插入到另一个 postgresql table 中。
我尝试使用以下语法但没有成功。
from("timer://foo?period="+serverData.getTimer()).threads(serverData.getNumberOfInstances()).setBody(constant(serverData.getSql())).
setBody(constant("insert into new_raw_table (numbersubscribers,id) values(:?numbersubscribers,:?id)")).
to("jdbc:" + serverData.getUrl() + serverData.getUserId()).
process(custom application logic)
或
from("timer://foo?period="+serverData.getTimer()).threads(serverData.getNumberOfInstances()).setBody(constant(serverData.getSql())).
to("jdbc:" + serverData.getUrl() + serverData.getUserId() + "?consumer.onConsume=insert into new_raw_table(NumberSubscribers,ID) values(:#NumberSubscribers,:#ID)");
process(custom application logic)
两种语法都不能将 selected 数据从一个 postgresql 插入到另一个 postgresql 模式。
您有 2 个查询,但只有一个 jdbc 端点。我认为,您需要将另一个 jdbc 端点与另一个数据源一起使用,因为您有另一个数据库。
但这并不能解决单笔交易中两次查询的问题。有关交易客户端的更多信息,请参见此处:http://camel.apache.org/transactional-client.html
更新 2:
from("timer://foo?period="+serverData.getTimer())
.threads(serverData.getNumberOfInstances())
.setBody(constant(serverData.getSql()))
.to("jdbc:" + serverData.getUrl() + serverData.getUserId())
//Think about query parameters, based on first query result
//You need to define them, by default the result is returned in the OUT body
//ArrayList<HashMap<String, Object>> in the OUT body
.to("log:before-split?level=INFO&showAll=true&multiline=true")
.split(body()) //because you received more then one record
.to("log:after-split?level=INFO&showAll=true&multiline=true")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
//Record of first query after splitter in the Map
Map<String, Object> record = exchange.getOut().getBody(Map.class);
exchange.getIn().setHeader("numbersubscribers", record.get("numbersubscribers"));
exchange.getIn().setHeader("id", record.get("id"));
}
})
.to("log:before2query?level=INFO&showAll=true&multiline=true")
.setBody(constant("insert into new_raw_table (numbersubscribers,values(:?numbersubscribers,:?id)"))
.to("jdbc:".....) //You have to refer here to another datasource, maybe like below
//jdbc:embeddedDataSource?allowNamedParameters=true&outputType=SelectOne&useHeadersAsParameters=true&resetAutoCommit=false
.process(custom application logic);
感谢@Alexey 关于添加日志记录的建议。
这就是我的工作方式。骆驼路线语法将是 -
from("timer://foo?period="+serverData.getTimer()).threads(serverData.getNumberOfInstances()).setBody(constant(serverData.getSql())).
to("jdbc:" + serverData.getUrl() + serverData.getUserId()).
split(body()).process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
Map<String, Object> record = exchange.getIn().getBody(Map.class);
exchange.getIn().setHeader("numbersubscribers", record.get("numbersubscribers"));
exchange.getIn().setHeader("id", record.get("id"));
}
}).
setBody(constant("insert into test_table (numbersubscribers, id) values(:?numbersubscribers, :?id)")).
to("jdbc:" + serverData.getUrl() + serverData.getUserId()+"?allowNamedParameters=true&outputType=SelectOne&useHeadersAsParameters=true&resetAutoCommit=false");}
在我的场景中,select 查询数据以 "InOnly" 交换模式存在,所以当我 exchange.getOut().getBody(Map.class)
获取数据时,它向我抛出 空指针异常。通过日志记录,我发现了交换模式的类型并更正为 exchange.getIn().getBody(Map.class)
并且工作正常。
为了将来参考,我列出了日志堆栈跟踪。
Exchange[
, Id: ID-5CG4525D29-58712-1450892474425-0-2
, ExchangePattern: InOnly
, Properties: {CamelCreatedTimestamp=Wed Dec 23 23:11:16 IST 2015, CamelMessageHistory=[DefaultMessageHistory[routeId=route1, node=threads1], DefaultMessageHistory[routeId=route1, node=setBody1], DefaultMessageHistory[routeId=route1, node=to1], DefaultMessageHistory[routeId=route1, node=to2]], CamelTimerCounter=1, CamelTimerFiredTime=Wed Dec 23 23:11:16 IST 2015, CamelTimerName=foo, CamelTimerPeriod=15000, CamelToEndpoint=log://before-split?level=INFO&multiline=true&showAll=true}
, Headers: {breadcrumbId=ID-5CG4525D29-58712-1450892474425-0-1, CamelJdbcColumnNames=[numbersubscribers, id], CamelJdbcRowCount=2, firedTime=Wed Dec 23 23:11:16 IST 2015}
, BodyType: java.util.ArrayList
, Body: [{numbersubscribers=11, id=22}, {numbersubscribers=55, id=66}]
这可能会帮助其他面临类似问题的人。谢谢
我正在为我的企业应用程序使用 camel 2.15 和 java 1.7。我们的数据库是 postgresql 9.4.0。
我正在尝试为我的要求是使用 select 查询从一个 postgresql 模式读取数据并将相同数据插入另一个 postgre[=37] 的场景配置骆驼路线=] 模式。
我使用以下语法从 postgresql -
中获取数据from("timer://foo?period="+serverData.getTimer()).threads(serverData.getNumberOfInstances()).setBody(constant(serverData.getSql())).
to("jdbc:" + serverData.getUrl() + serverData.getUserId() + "?outputType=SelectList").
process(custom application logic)
使用上述语法,我可以获取数据并处理成 process
方法中存在的自定义逻辑。
注意 - 在上面的语法中,serverData
class 用于获取 postgresql 连接详细信息和 sql 详细信息。
现在,我的问题是 - 如何定义骆驼路线,以便 selected 数据可以通过插入查询直接插入到另一个 postgresql table 中。
我尝试使用以下语法但没有成功。
from("timer://foo?period="+serverData.getTimer()).threads(serverData.getNumberOfInstances()).setBody(constant(serverData.getSql())).
setBody(constant("insert into new_raw_table (numbersubscribers,id) values(:?numbersubscribers,:?id)")).
to("jdbc:" + serverData.getUrl() + serverData.getUserId()).
process(custom application logic)
或
from("timer://foo?period="+serverData.getTimer()).threads(serverData.getNumberOfInstances()).setBody(constant(serverData.getSql())).
to("jdbc:" + serverData.getUrl() + serverData.getUserId() + "?consumer.onConsume=insert into new_raw_table(NumberSubscribers,ID) values(:#NumberSubscribers,:#ID)");
process(custom application logic)
两种语法都不能将 selected 数据从一个 postgresql 插入到另一个 postgresql 模式。
您有 2 个查询,但只有一个 jdbc 端点。我认为,您需要将另一个 jdbc 端点与另一个数据源一起使用,因为您有另一个数据库。
但这并不能解决单笔交易中两次查询的问题。有关交易客户端的更多信息,请参见此处:http://camel.apache.org/transactional-client.html
更新 2:
from("timer://foo?period="+serverData.getTimer())
.threads(serverData.getNumberOfInstances())
.setBody(constant(serverData.getSql()))
.to("jdbc:" + serverData.getUrl() + serverData.getUserId())
//Think about query parameters, based on first query result
//You need to define them, by default the result is returned in the OUT body
//ArrayList<HashMap<String, Object>> in the OUT body
.to("log:before-split?level=INFO&showAll=true&multiline=true")
.split(body()) //because you received more then one record
.to("log:after-split?level=INFO&showAll=true&multiline=true")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
//Record of first query after splitter in the Map
Map<String, Object> record = exchange.getOut().getBody(Map.class);
exchange.getIn().setHeader("numbersubscribers", record.get("numbersubscribers"));
exchange.getIn().setHeader("id", record.get("id"));
}
})
.to("log:before2query?level=INFO&showAll=true&multiline=true")
.setBody(constant("insert into new_raw_table (numbersubscribers,values(:?numbersubscribers,:?id)"))
.to("jdbc:".....) //You have to refer here to another datasource, maybe like below
//jdbc:embeddedDataSource?allowNamedParameters=true&outputType=SelectOne&useHeadersAsParameters=true&resetAutoCommit=false
.process(custom application logic);
感谢@Alexey 关于添加日志记录的建议。
这就是我的工作方式。骆驼路线语法将是 -
from("timer://foo?period="+serverData.getTimer()).threads(serverData.getNumberOfInstances()).setBody(constant(serverData.getSql())).
to("jdbc:" + serverData.getUrl() + serverData.getUserId()).
split(body()).process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
Map<String, Object> record = exchange.getIn().getBody(Map.class);
exchange.getIn().setHeader("numbersubscribers", record.get("numbersubscribers"));
exchange.getIn().setHeader("id", record.get("id"));
}
}).
setBody(constant("insert into test_table (numbersubscribers, id) values(:?numbersubscribers, :?id)")).
to("jdbc:" + serverData.getUrl() + serverData.getUserId()+"?allowNamedParameters=true&outputType=SelectOne&useHeadersAsParameters=true&resetAutoCommit=false");}
在我的场景中,select 查询数据以 "InOnly" 交换模式存在,所以当我 exchange.getOut().getBody(Map.class)
获取数据时,它向我抛出 空指针异常。通过日志记录,我发现了交换模式的类型并更正为 exchange.getIn().getBody(Map.class)
并且工作正常。
为了将来参考,我列出了日志堆栈跟踪。
Exchange[
, Id: ID-5CG4525D29-58712-1450892474425-0-2
, ExchangePattern: InOnly
, Properties: {CamelCreatedTimestamp=Wed Dec 23 23:11:16 IST 2015, CamelMessageHistory=[DefaultMessageHistory[routeId=route1, node=threads1], DefaultMessageHistory[routeId=route1, node=setBody1], DefaultMessageHistory[routeId=route1, node=to1], DefaultMessageHistory[routeId=route1, node=to2]], CamelTimerCounter=1, CamelTimerFiredTime=Wed Dec 23 23:11:16 IST 2015, CamelTimerName=foo, CamelTimerPeriod=15000, CamelToEndpoint=log://before-split?level=INFO&multiline=true&showAll=true}
, Headers: {breadcrumbId=ID-5CG4525D29-58712-1450892474425-0-1, CamelJdbcColumnNames=[numbersubscribers, id], CamelJdbcRowCount=2, firedTime=Wed Dec 23 23:11:16 IST 2015}
, BodyType: java.util.ArrayList
, Body: [{numbersubscribers=11, id=22}, {numbersubscribers=55, id=66}]
这可能会帮助其他面临类似问题的人。谢谢