ORA-01747: Nifi PutDatabaseRecord 上的 user.table.column 无效

ORA-01747: invalid user.table.column on Nifi PutDatabaseRecord

我尝试用 Nifi 更新 Oracle 数据库中的一些数据库列。

我有这样的部分电路:

我有最后一个问题 PutDatabaserecord:

nifi_ml_nifi.1.rb4f8g690fro@KoshDomain    | 2021-07-12 17:34:20,919 ERROR [Timer-Driven Process Thread-2] o.a.n.p.standard.PutDatabaseRecord PutDatabaseRecord[id=017a10b4-fe2c-1b89-f752-67545ebb8406] Failed to put Records to database for StandardFlowFileRecord[uuid=db4a0554-6ea8-4cca-b1da-11bdacef1ccc,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1625839380170-6543, container=default, section=399], offset=292967, length=68],offset=0,name=f4d1875f-bec4-4944-ad8e-7955048148f3,size=68]. Routing to failure.: java.sql.BatchUpdateException: ORA-01747: invalid user.table.column, table.column, or column specification
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain    | 
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain    | java.sql.BatchUpdateException: ORA-01747: invalid user.table.column, table.column, or column specification
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain    | 
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain    |     at oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:10032)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain    |     at oracle.jdbc.driver.T4CPreparedStatement.executeLargeBatch(T4CPreparedStatement.java:1364)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain    |     at oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9839)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain    |     at oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:234)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain    |     at org.apache.commons.dbcp2.DelegatingStatement.executeBatch(DelegatingStatement.java:242)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain    |     at org.apache.commons.dbcp2.DelegatingStatement.executeBatch(DelegatingStatement.java:242)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain    |     at sun.reflect.GeneratedMethodAccessor156.invoke(Unknown Source)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain    |     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain    |     at java.lang.reflect.Method.invoke(Method.java:498)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain    |     at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:254)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain    |     at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.access0(StandardControllerServiceInvocationHandler.java:38)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain    |     at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler$ProxiedReturnObjectInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:240)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain    |     at com.sun.proxy.$Proxy149.executeBatch(Unknown Source)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain    |     at org.apache.nifi.processors.standard.PutDatabaseRecord.executeDML(PutDatabaseRecord.java:754)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain    |     at org.apache.nifi.processors.standard.PutDatabaseRecord.putToDatabase(PutDatabaseRecord.java:841)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain    |     at org.apache.nifi.processors.standard.PutDatabaseRecord.onTrigger(PutDatabaseRecord.java:487)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain    |     at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain    |     at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1173)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain    |     at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:214)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain    |     at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent.run(TimerDrivenSchedulingAgent.java:117)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain    |     at org.apache.nifi.engine.FlowEngine.run(FlowEngine.java:110)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain    |     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain    |     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain    |     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain    |     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain    |     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain    |     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
nifi_ml_nifi.1.rb4f8g690fro@KoshDomain    |     at java.lang.Thread.run(Thread.java:748)

这是有问题的节点的配置:

这是 RecordReader 的架构:

{
    "name": "load_date",
    "type": "record",
    "namespace": "maxi",
    "fields": [
      {
        "name": "doc_id",
        "type": "int"
      },
      {
        "name": "line_id",
        "type": "int"
      },
      {
        "name": "load_date",
        "type": "string"
      }
    ]
  }

这是到达节点的 json 数据的示例:

[{"doc_id":1795576199,"line_id":689617855,"load_date":"2021-34-12"}]

更新

好的,我将 PutDatabaseRecord 设置为调试模式,评估单条记录?用于从处理器捕获完整的调试信息。这是日志的头部,正好是处理器开始处理记录的时候:

nifi_ml_nifi.1.zutify8jh9sv@KoshDomain    | 2021-07-13 14:15:23,951 INFO [NiFi Web Server-456] o.a.n.c.s.StandardProcessScheduler Starting SplitJson[id=9f810155-017a-1000-890c-4b1e382a161e]
nifi_ml_nifi.1.zutify8jh9sv@KoshDomain    | 2021-07-13 14:15:23,951 INFO [NiFi Web Server-456] o.a.n.controller.StandardProcessorNode Starting SplitJson[id=9f810155-017a-1000-890c-4b1e382a161e]
nifi_ml_nifi.1.zutify8jh9sv@KoshDomain    | 2021-07-13 14:15:23,964 INFO [Timer-Driven Process Thread-1] o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled SplitJson[id=9f810155-017a-1000-890c-4b1e382a161e] to run with 1 threads
nifi_ml_nifi.1.zutify8jh9sv@KoshDomain    | 2021-07-13 14:15:23,976 ERROR [Timer-Driven Process Thread-1] o.a.n.p.standard.PutDatabaseRecord PutDatabaseRecord[id=017a10b4-fe2c-1b89-f752-67545ebb8406] Failed to put Records to database for StandardFlowFileRecord[uuid=9c53fd8b-775b-42a6-bcae-fb4208a40985,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1626174735665-61, container=default, section=61], offset=776075, length=330],offset=0,name=51510f97-7a54-4394-966a-12775653acb0,size=66]. Routing to failure.: java.sql.SQLDataException: Cannot map field 'doc_id' to any column in the database
nifi_ml_nifi.1.zutify8jh9sv@KoshDomain    | Columns: 
nifi_ml_nifi.1.zutify8jh9sv@KoshDomain    | java.sql.SQLDataException: Cannot map field 'doc_id' to any column in the database
nifi_ml_nifi.1.zutify8jh9sv@KoshDomain    | Columns: 
nifi_ml_nifi.1.zutify8jh9sv@KoshDomain    |     at org.apache.nifi.processors.standard.PutDatabaseRecord.generateUpdate(PutDatabaseRecord.java:1073)

(我在那里添加了另一个处理器 - SplitJson

而且,我在数据库上自己的方案中添加了一些table。

这是 DDL:

CREATE TABLE psheom.ml_task (
  doc_id number, 
  line_id number,
  load_date DATE,
  CONSTRAINT pk_ml_task PRIMARY KEY(doc_id, line_id) 
)

这是 DESCRIBE psheom.ml_task 的结果:

Name      Null?    Type   
--------- -------- ------ 
DOC_ID    NOT NULL NUMBER 
LINE_ID   NOT NULL NUMBER 
LOAD_DATE          DATE  

更新

我尝试按照@pmdba 的建议制作 ExecuteSql 处理器,但出现了一些错误。这是配置:

这是错误:

nifi_ml_nifi.1.zutify8jh9sv@KoshDomain    | 2021-07-13 14:57:19,137 ERROR [Timer-Driven Process Thread-6] o.a.nifi.processors.standard.ExecuteSQL ExecuteSQL[id=9fb97e49-017a-1000-bae9-256f569c239b] Unable to execute SQL select query describe psheom.ml_task due to java.sql.SQLSyntaxErrorException: ORA-00900: invalid SQL statement
nifi_ml_nifi.1.zutify8jh9sv@KoshDomain    | . No FlowFile to route to failure: java.sql.SQLSyntaxErrorException: ORA-00900: invalid SQL statement
nifi_ml_nifi.1.zutify8jh9sv@KoshDomain    | 
nifi_ml_nifi.1.zutify8jh9sv@KoshDomain    | java.sql.SQLSyntaxErrorException: ORA-00900: invalid SQL statement
nifi_ml_nifi.1.zutify8jh9sv@KoshDomain    | 
nifi_ml_nifi.1.zutify8jh9sv@KoshDomain    |     at oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:494)

更新

我将 Translate Field Names 设置为 false,但结果相同。

好的,所以我们需要为 UPDATEOracle.

中的一些记录做“几件事”

首先不要使用DBeaver!这是非常故障!仅使用 SQLDeveloper 来处理 Oracle。

其次,在Nifi端为连接池配置相同的用户创建会话,运行 DESCRIBE for table 你需要更新:

DESCRIBE GOODS.ML_TASK

确定,成功了!

第三,我们需要所有的实体名称大写,并且将Translate Field Names设置为false是很好的,这样可以去除下划线,即:doc_id -> docid.

首先,您需要 json 在处理器的输入中使用大写字段名称:

[
  {"DOC_ID":1799041400,"LINE_ID":694098344,"LOAD_DATE":"14-Jul-21"},
  {"DOC_ID":1802019315,"LINE_ID":697885808,"LOAD_DATE":"14-Jul-21"}
]

如果您在某些地方有小写字段,请将 UpdateAttribute 处理器与 reader 和编写器一起使用,其中 reader 将大写字段:

为 PutDatabaseRecord reader 定义的架构 必须 大写:

{
    "name": "load_date",
    "type": "record",
    "namespace": "maxi",
    "fields": [
      {
        "name": "DOC_ID",
        "type": "int"
      },
      {
        "name": "LINE_ID",
        "type": "int"
      },
      {
        "name": "LOAD_DATE",
        "type": "string"
      }
   ]
}

所以你有:

UpdateRecord:

  1. IncomingJsonReader 接受记录为小写,并变为大写
  2. IncomingJsonWriter

PutDatanaseRecord

  1. OutgoingJsonReader

他们都使用相同的JSON SCHEMA

第五,如您所见,使用 string 日期类型。

您可以将日期作为日期的毫秒数,但是,尽管它在我自己的模式中有效,但它在生产模式中不起作用。所以需要查询NLS_DATE_FORMAT.

将适当的查询放入 ExecuteSQL 处理器:

select * from nls_session_parameters where parameter = 'NLS_DATE_FORMAT'

结果队列中的格式为:

[{"PARAMETER": "NLS_DATE_FORMAT", "VALUE": "DD-MON-RR"}]

使用它以适当的方式格式化日期。

第六,如我们所见,将正确的日期格式放在“表达式语言”表达式中的正确位置,例如,在 UpdateRecord 我的例子中:

第七,确保你只有一条或几条记录通过了队列,以便能够监控日志。同时将 PutDatabaseRecord 置于 DEBUG 模式:

通过这种方式,它会通知您从数据库中获取模式是否成功。

所以...祝您 Oracle 的 table 更新愉快!