Kafka JDBC 连接查询导致 ORA-00933: SQL 命令未正确结束
Kafka JDBC Connect query causes ORA-00933: SQL command not properly ended
我有这个 Oracle SQL 查询:
SELECT * FROM
(SELECT SO_ORDER_KEY,QUEUE_TYPE,SYS_NO,
DENSE_RANK() OVER (PARTITION BY SO_ORDER_KEY ORDER BY SYS_NO DESC) ORDER_RANK
FROM TSY940)
WHERE ORDER_RANK=1;
当 运行 在 SQL 开发者中时,它 returns 想要的结果。
出于某种原因,当我在 kafka-connect-jdbc 属性中使用此查询时,我得到
ERROR Failed to run query for table TimestampIncrementingTableQuerier{name='null', query='SELECT * FROM (SELECT SO_ORDER_KEY,QUEUE_TYPE,SYS_NO,DENSE_RANK() OVER (PARTITION BY SO_ORDER_KEY ORDER BY SYS_NO DESC) ORDER_RANK FROM TSY940) WHERE ORDER_RANK=1', topicPrefix='TSY940', timestampColumn='SYS_NO', incrementingColumn='null'}: {} (io.confluent.connect.jdbc.source.JdbcSourceTask:247)
java.sql.SQLSyntaxErrorException: ORA-00933: SQL command not properly ended
at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:450)
at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:399)
at oracle.jdbc.driver.T4C8Oall.processError(T4C8Oall.java:1059)
at oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:522)
at oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:257)
at oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:587)
at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:225)
at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:53)
at oracle.jdbc.driver.T4CPreparedStatement.executeForDescribe(T4CPreparedStatement.java:774)
at oracle.jdbc.driver.OracleStatement.executeMaybeDescribe(OracleStatement.java:925)
at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1111)
at oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:4798)
at oracle.jdbc.driver.OraclePreparedStatement.executeQuery(OraclePreparedStatement.java:4845)
at oracle.jdbc.driver.OraclePreparedStatementWrapper.executeQuery(OraclePreparedStatementWrapper.java:1501)
at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:201)
at io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:84)
at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:55)
at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:225)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:179)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
这是我的属性文件:
name=poc-oracle-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
connection.password = ********
connection.url = jdbc:oracle:thin:@***.***.***.**:****/******
connection.user = ***********
table.types=TABLE
query=SELECT * FROM (SELECT SO_ORDER_KEY,QUEUE_TYPE,SYS_NO,DENSE_RANK() OVER (PARTITION BY SO_ORDER_KEY ORDER BY SYS_NO DESC) ORDER_RANK FROM TSY940) WHERE ORDER_RANK=1
mode=timestamp
timestamp.column.name=SYS_NO
topic.prefix=TSY940
batch.max.rows = 500
poll.interval.ms=60000
transforms=createKey,extract
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=SO_ORDER_KEY
transforms.extract.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extract.field=SO_ORDER_KEY
我用的是ojdbc7驱动。
WHERE 子句似乎是问题所在,因为当我将查询 属性 替换为
时没有出现异常
query=SELECT * FROM (SELECT SO_ORDER_KEY,QUEUE_TYPE,SYS_NO,DENSE_RANK() OVER (PARTITION BY SO_ORDER_KEY ORDER BY SYS_NO DESC) ORDER_RANK FROM TSY940)
您可以尝试添加别名并删除 TO_CHAR
:
SELECT SO_ORDER_KEY, QUEUE_TYPE, SYS_NO
FROM (SELECT SO_ORDER_KEY, QUEUE_TYPE, SYS_NO,
DENSE_RANK() OVER(PARTITION BY SO_ORDER_KEY ORDER BY SYS_NO DESC) AS ORDER_RANK
FROM TSY940) sub
WHERE sub.ORDER_RANK=1;
然后您可以尝试这个完全消除 ORDER_RANK
的查询
SELECT SO_ORDER_KEY, QUEUE_TYPE, SYS_NO
FROM (
SELECT SO_ORDER_KEY, QUEUE_TYPE, SYS_NO
FROM (SELECT SO_ORDER_KEY, QUEUE_TYPE, SYS_NO,
DENSE_RANK() OVER(PARTITION BY SO_ORDER_KEY ORDER BY SYS_NO DESC) AS ORDER_RANK
FROM TSY940) sub
WHERE sub.ORDER_RANK=1
)
查看问题原因的最佳方法是启用 10046 跟踪并查看发送到数据库并导致 ORA-00933
.
的确切查询
我知道这个问题有一个可接受的答案,但你遇到的问题(以及为什么将查询包装在另一个查询中解决了这个问题)是当连接作业在 timestamp
中 运行 ] 模式,Connect 使用指定的时间戳字段将子句附加到查询。这意味着 Connect 正在将此无效查询传递给 Oracle:
SELECT *
FROM (SELECT
SO_ORDER_KEY,
QUEUE_TYPE,
SYS_NO,
DENSE_RANK()
OVER (
PARTITION BY SO_ORDER_KEY
ORDER BY SYS_NO DESC ) ORDER_RANK
FROM TSY940)
WHERE ORDER_RANK = 1
WHERE SYS_NO >= {last_run_timestamp}
看到最后一行了吗?这是由 Connect 添加的。当您将查询包装在另一个 SELECT
中时,附加的子句不再导致问题。
当我第一次遇到这个问题时,我发现文档中没有任何地方提到它,这很烦人。
失败,因为连接器附加了它的时间戳子句。一个人可以使用带有谓词的查询作为内部查询,像这样:“select * from (select a,b,c from table where a='x') . 正如 Alex 提到的那样,这是一种解决方法。请找到问题 here:https://github.com/confluentinc/kafka-connect-jdbc/issues/566
我有这个 Oracle SQL 查询:
SELECT * FROM
(SELECT SO_ORDER_KEY,QUEUE_TYPE,SYS_NO,
DENSE_RANK() OVER (PARTITION BY SO_ORDER_KEY ORDER BY SYS_NO DESC) ORDER_RANK
FROM TSY940)
WHERE ORDER_RANK=1;
当 运行 在 SQL 开发者中时,它 returns 想要的结果。
出于某种原因,当我在 kafka-connect-jdbc 属性中使用此查询时,我得到
ERROR Failed to run query for table TimestampIncrementingTableQuerier{name='null', query='SELECT * FROM (SELECT SO_ORDER_KEY,QUEUE_TYPE,SYS_NO,DENSE_RANK() OVER (PARTITION BY SO_ORDER_KEY ORDER BY SYS_NO DESC) ORDER_RANK FROM TSY940) WHERE ORDER_RANK=1', topicPrefix='TSY940', timestampColumn='SYS_NO', incrementingColumn='null'}: {} (io.confluent.connect.jdbc.source.JdbcSourceTask:247)
java.sql.SQLSyntaxErrorException: ORA-00933: SQL command not properly ended
at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:450)
at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:399)
at oracle.jdbc.driver.T4C8Oall.processError(T4C8Oall.java:1059)
at oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:522)
at oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:257)
at oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:587)
at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:225)
at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:53)
at oracle.jdbc.driver.T4CPreparedStatement.executeForDescribe(T4CPreparedStatement.java:774)
at oracle.jdbc.driver.OracleStatement.executeMaybeDescribe(OracleStatement.java:925)
at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1111)
at oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:4798)
at oracle.jdbc.driver.OraclePreparedStatement.executeQuery(OraclePreparedStatement.java:4845)
at oracle.jdbc.driver.OraclePreparedStatementWrapper.executeQuery(OraclePreparedStatementWrapper.java:1501)
at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:201)
at io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:84)
at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:55)
at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:225)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:179)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
这是我的属性文件:
name=poc-oracle-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
connection.password = ********
connection.url = jdbc:oracle:thin:@***.***.***.**:****/******
connection.user = ***********
table.types=TABLE
query=SELECT * FROM (SELECT SO_ORDER_KEY,QUEUE_TYPE,SYS_NO,DENSE_RANK() OVER (PARTITION BY SO_ORDER_KEY ORDER BY SYS_NO DESC) ORDER_RANK FROM TSY940) WHERE ORDER_RANK=1
mode=timestamp
timestamp.column.name=SYS_NO
topic.prefix=TSY940
batch.max.rows = 500
poll.interval.ms=60000
transforms=createKey,extract
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=SO_ORDER_KEY
transforms.extract.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extract.field=SO_ORDER_KEY
我用的是ojdbc7驱动。
WHERE 子句似乎是问题所在,因为当我将查询 属性 替换为
时没有出现异常query=SELECT * FROM (SELECT SO_ORDER_KEY,QUEUE_TYPE,SYS_NO,DENSE_RANK() OVER (PARTITION BY SO_ORDER_KEY ORDER BY SYS_NO DESC) ORDER_RANK FROM TSY940)
您可以尝试添加别名并删除 TO_CHAR
:
SELECT SO_ORDER_KEY, QUEUE_TYPE, SYS_NO
FROM (SELECT SO_ORDER_KEY, QUEUE_TYPE, SYS_NO,
DENSE_RANK() OVER(PARTITION BY SO_ORDER_KEY ORDER BY SYS_NO DESC) AS ORDER_RANK
FROM TSY940) sub
WHERE sub.ORDER_RANK=1;
然后您可以尝试这个完全消除 ORDER_RANK
SELECT SO_ORDER_KEY, QUEUE_TYPE, SYS_NO
FROM (
SELECT SO_ORDER_KEY, QUEUE_TYPE, SYS_NO
FROM (SELECT SO_ORDER_KEY, QUEUE_TYPE, SYS_NO,
DENSE_RANK() OVER(PARTITION BY SO_ORDER_KEY ORDER BY SYS_NO DESC) AS ORDER_RANK
FROM TSY940) sub
WHERE sub.ORDER_RANK=1
)
查看问题原因的最佳方法是启用 10046 跟踪并查看发送到数据库并导致 ORA-00933
.
我知道这个问题有一个可接受的答案,但你遇到的问题(以及为什么将查询包装在另一个查询中解决了这个问题)是当连接作业在 timestamp
中 运行 ] 模式,Connect 使用指定的时间戳字段将子句附加到查询。这意味着 Connect 正在将此无效查询传递给 Oracle:
SELECT *
FROM (SELECT
SO_ORDER_KEY,
QUEUE_TYPE,
SYS_NO,
DENSE_RANK()
OVER (
PARTITION BY SO_ORDER_KEY
ORDER BY SYS_NO DESC ) ORDER_RANK
FROM TSY940)
WHERE ORDER_RANK = 1
WHERE SYS_NO >= {last_run_timestamp}
看到最后一行了吗?这是由 Connect 添加的。当您将查询包装在另一个 SELECT
中时,附加的子句不再导致问题。
当我第一次遇到这个问题时,我发现文档中没有任何地方提到它,这很烦人。
失败,因为连接器附加了它的时间戳子句。一个人可以使用带有谓词的查询作为内部查询,像这样:“select * from (select a,b,c from table where a='x') . 正如 Alex 提到的那样,这是一种解决方法。请找到问题 here:https://github.com/confluentinc/kafka-connect-jdbc/issues/566