Camel enrich SQL 语法问题

Camel enrich SQL syntax issue

我的任务是使用 Camel 版本 2.20.0 创建一个 Camel 路由,该路由从 CSV 文件中获取一行,使用 SQL 语句 where 子句中该行的值并合并结果和再次输出它们。如果我在 SQL 语句中硬编码标识符,它工作正常,如果我尝试使用动态 URI,我会收到错误消息。

路线是:

from("file:///tmp?fileName=test.csv")
.split()
.tokenize("\n")
.streaming()
.parallelProcessing(true)
.setHeader("userID", constant("1001"))
//.enrich("sql:select emplid,name from employees where emplid = '1001'",
.enrich("sql:select name from employees where emplid = :#userID",
     new AggregationStrategy() {
        public Exchange aggregate(Exchange oldExchange,
                                     Exchange newExchange)    {...

正如我所说,如果我取消注释带有硬编码 1001 的行,它会查询数据库并按预期工作。但是使用 ':#userID' 语法我得到一个 Oracle 错误:

java.sql.SQLSyntaxErrorException: ORA-00942: table or view does not exist


    Message History
    ---------------------------------------------------------------------------------------------------------------------------------------
    RouteId              ProcessorId          Processor                                                                        Elapsed (ms)
    [route3            ] [route3            ] [file:///tmp?fileName=test.csv                                                 ] [        43]
    [route3            ] [log5              ] [log                                                                           ] [         2]
    [route3            ] [setHeader2        ] [setHeader[userID]                                                             ] [         0]
    [route3            ] [enrich2           ] [enrich[constant{sql:select name from employees where emplid = :#userID] [        40]

table 显然在那里,因为它在值被硬编码时起作用,因此它与传入动态值有关。我已经尝试了很多关于如何在单引号内传递该变量的变体,使用 body 中的值而不是 headers 等,但我还没有找到有效的组合我见过很多类似的看似有效的例子。

我打开了跟踪功能,看来 header 也已正确设置:

o.a.camel.processor.interceptor.Tracer   :  >>> (route3) setHeader[userID, 1001] --> enrich[constant{sql:select name from employees where emplid = :#userID}] <<< Pattern:InOnly, Headers:{CamelFileAbsolute=true, CamelFileAbsolutePath=/tmp/test.csv, CamelFileLastModified=1513116018000, CamelFileLength=26, CamelFileName=test.csv, CamelFileNameConsumed=test.csv, CamelFileNameOnly=test.csv, CamelFileParent=/tmp, CamelFilePath=/tmp/test.csv, CamelFileRelativePath=test.csv, userID=1001}, BodyType:String, Body:1001,SomeValue,MoreValues

需要更改什么才能使这项工作正常进行?

我还应该注意到我已经尝试过这种方法,使用各种语法选项来引用 header 值,但没有任何运气:

.enrich().simple("sql:select * from employees where emplid = :#${in.header.userID}").aggregate ...

来自 Camel 文档:

pollEnrich or enrich does not access any data from the current Exchange which means when polling it cannot use any of the existing headers you may have set on the Exchange.

实现您想要的目标的推荐方法是使用 recipientList,所以我建议您继续阅读。

编辑:

正如 Ricardo Zanini 在他的回答中正确指出的那样,从 2.16 开始,Camel 版本实际上可以实现这一点。由于 OP 使用的是 2.20,我的回答无效。

不过,我会保留我的回答,但要指出的是,这仅在您使用的版本低于 2.16 时才有效。

来自docs

From Camel 2.16 onwards both enrich and pollEnrich supports dynamic endpoints that uses an Expression to compute the uri, which allows to use data from the current Exchange. In other words all what is told above no longer apply and it just works.

由于您使用的是 2.20,我想您可以试试这个例子:

from("file:///tmp?fileName=test.csv")
.split()
.tokenize("\n")
.streaming()
.parallelProcessing(true)
.setHeader("userID", constant("1001"))
//.enrich("sql:select emplid,name from employees where emplid = '1001'",
.enrich("sql:select name from employees where emplid = ':#${in.header.userID}'",
    new AggregationStrategy() {
        public Exchange aggregate(Exchange oldExchange,
                                    Exchange newExchange)    {...

查看文档中的 Expression 主题以获取更多示例。

总而言之,表达式可以是:

"sql:select name from employees where emplid = ':#${in.header.userID}'"

编辑:

抱歉,我错过了 :# suffix. You could see a unit test working here

只需注意列类型。如果它是整数,则不需要引号。

干杯!