如何使用 Apache Camel SQL 组件仅更新消耗的 sql 行?

How to update only consumed sql rows with Apache Camel SQL Component?

我是 Camel 的新手,我尝试处理 SQL 数据。如果 SQL 消耗 (select) 完成,我尝试更新消耗的行,但我只是得到一个 "bad SQL grammar" 异常。

我使用 Apache Camel SQL-组件,其中起始端点是 sql select 语句。为了将它们标记为已消耗,我使用 SQL-ComponentonConsume 参数。在select中,v_table是原来tablet_table的视图,以后更新时会用到。所以 v_tablet_table 中一行的 id 是相同的。为了不更新 t_table 中的所有行,我将 where 条件与 where id = :#id.

一起使用
String sqlSelect = "select * from v_table where camel_is_read = 0";
String sqlUpdate = "update t_table set camel_is_read = 1, date_checked = sysdate where id = :#id";
from("sql:"+sqlSelect+"?dataSource=myDataSource&onConsume="+sqlUpdate)
.process(new Processor() {
    public void process(Exchange exchange) throws Exception {
        System.out.println(exchange.getIn().getBody().toString());
    }
})                    
.errorHandler(deadLetterChannel("direct:moveFailedOut").useOriginalMessage())
.bean("orderToJms")
.to(jmsURI)
.bean("validate")
.to(ftpOut);

如果我执行这个,我得到以下异常:

WARN  Error executing onConsume/onConsumeFailed query update t_table set camel_is_read = 1, date_checked = sysdate where id = :?id. Caused by: [org.springframework.jdbc.BadSqlGrammarException - PreparedStatementCallback; bad SQL grammar [update t_table set camel_is_read = 1, date_checked = sysdate where id = ?]; nested exception is java.sql.SQLSyntaxErrorException: ORA-00904: "YSDATEHERE": ungültiger Bezeichner
]
org.springframework.jdbc.BadSqlGrammarException: PreparedStatementCallback; bad SQL grammar [update t_table set camel_is_read = 1, date_checked = sysdate where id = ? exception is java.sql.SQLSyntaxErrorException: ORA-00904: "YSDATEHERE": ungültiger Bezeichner

    at org.springframework.jdbc.support.SQLErrorCodeSQLExceptionTranslator.doTranslate(SQLErrorCodeSQLExceptionTranslator.java:237)
    at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:72)
    at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:605)
    at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:617)
    at org.apache.camel.component.sql.DefaultSqlProcessingStrategy.commit(DefaultSqlProcessingStrategy.java:46)
    at org.apache.camel.component.sql.SqlConsumer.processBatch(SqlConsumer.java:195)
    at org.apache.camel.component.sql.SqlConsumer.doInPreparedStatement(SqlConsumer.java:118)
    at org.apache.camel.component.sql.SqlConsumer.doInPreparedStatement(SqlConsumer.java:91)
    at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:589)
    at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:617)
    at org.apache.camel.component.sql.SqlConsumer.poll(SqlConsumer.java:91)
    at org.apache.camel.impl.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:174)
    at org.apache.camel.impl.ScheduledPollConsumer.run(ScheduledPollConsumer.java:101)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:178)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:724)
Caused by: java.sql.SQLSyntaxErrorException: ORA-00904: "YSDATEHERE": ungültiger Bezeichner

我尝试使用测试 ID(例如“3”)在数据库中手动执行更新,这有效,所以通用 SQL 语法应该没问题。所以在我看来,Camel 无法替换 :#id 参数。

我添加了一个处理器,以检查 select:

的结果
{ID=3, [...] CAMEL_IS_READ=0}

在这里我可以看到,select 成功捕获了必要的 id。我不明白为什么 Camel 无法用 id 值 3 替换 :#id 参数。有人知道如何解决这个问题吗?我使用 this and this 作为粗略的 example/template。或者这种方法通常是错误的?

最后,通过用 to_date() 函数包围 sysdate 语句,它对我有用:

String sqlSelect = "select * from v_table where camel_is_read = 0";
String sqlUpdate = "update t_table set camel_is_read = 1, date_checked = to_date(sysdate) where id = :#id";
from("sql:"+sqlSelect+"?dataSource=myDataSource&onConsume="+sqlUpdate)
.process(new Processor() {
    public void process(Exchange exchange) throws Exception {
        System.out.println(exchange.getIn().getBody().toString());
    }
})                    
.errorHandler(deadLetterChannel("direct:moveFailedOut").useOriginalMessage())
.bean("orderToJms")
.to(jmsURI)
.bean("validate")
.to(ftpOut);

一些注意事项,什么不起作用/不是一个好主意(在我看来)并且可能会节省您的时间:
在某些情况下 onConsumeBatchComplete 可能是一个解决方案。如果您知道条件,您可能会暗示,如果具有此条件的所有行都通过 camel 传递,则可以更新这些行。这与 sysdate 一起工作并且不需要 id 占位符。这个解决方案的缺点是,它是牵连的,如果同时添加新行,骆驼没有通过,它们也会在批处理完成后更新。所以我不能推荐这个。

在Java中设置时间也不起作用,像这样:

String sqlUpdate = "update t_table date_checked = " + today + " where id = :#id";

如果您不是每天都重新开始您的路线,骆驼路线会在几天、几周或几年内处于活动状态。在这种情况下,camel 路线将在启动时生成,此后,"todays" 日期始终固定为这一天的日期。因此它将始终使用相同的日期进行数据库更新。