Hive DML 事务 (Update/Delete) 不适用于子查询

Hive DML transactions (Update/Delete) not working for subqueries

我知道 Hive/Hadoop 不适用于 Update/Delete 但我的要求是根据 table person21 的数据更新 table person20。随着 Hive 和 ORC 的进步,它支持 ACID,但看起来仍然不成熟。

$ hive --version 

配置单元 1.1.0-cdh5.6.0

以下是我为测试更新逻辑而执行的详细步骤。

CREATE TABLE person20(
  persid int,
  lastname string,
  firstname string)
CLUSTERED BY (
  persid)
INTO 1 BUCKETS
ROW FORMAT SERDE
  'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
LOCATION
  'hdfs://hostname.com:8020/user/hive/warehouse/person20'
TBLPROPERTIES (
  'COLUMN_STATS_ACCURATE'='true',
  'numFiles'='3',
  'numRows'='2',
  'rawDataSize'='348',
  'totalSize'='1730',
  'transactional'='true',
  'transient_lastDdlTime'='1489668385')

插入语句:

INSERT INTO TABLE person20 VALUES (0,'PP','B'),(2,'X','Y');

Select 语句:

set hive.cli.print.header=true;

select * from person20;

persid lastname  firstname
2       X       Y
0       PP      B

我有另一个 table 是 person20 的副本,即 person21:

CREATE TABLE person21(
  persid int,
  lastname string,
  firstname string)
ROW FORMAT SERDE
  'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  'hdfs://hostname.com:8020/user/hive/warehouse/person21'
TBLPROPERTIES (
  'COLUMN_STATS_ACCURATE'='true',
  'numFiles'='1',
  'numRows'='2',
  'rawDataSize'='11',
  'totalSize'='13',
  'transient_lastDdlTime'='1489668344')

插入语句:

INSERT INTO TABLE person20 VALUES (0,'SS','B'),(2,'X','Y');

Select 语句:

select * from person21;

persid lastname firstname
2       X1       Y
0       SS       B

我想实现 MERGE 逻辑:

Merge into  person20 p20 USING person21 p21
ON (p20.persid=p21.persid)
WHEN MATCHED THEN
UPDATE set p20.lastname=p21.lastname

其他选项是关联子查询更新:-

hive -e "set hive.auto.convert.join.noconditionaltask.size = 10000000; set hive.support.concurrency = true; set hive.enforce.bucketing = true; set hive.exec.dynamic.partition.mode = nonstrict; set hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; set hive.compactor.initiator.on = true;
set hive.compactor.worker.threads = 1 ; UPDATE person20 SET lastname = (select lastname from person21 where person21.lastname=person20.lastname);" 

Logging initialized using configuration in jar:file:/usr/lib/hive/lib/hive-common-1.1.0-cdh5.6.0.jar!/hive-log4j.properties NoViableAltException(224@[400:1: precedenceEqualExpression : ( (left= precedenceBitwiseOrExpression -> $left) ( ( KW_NOT precedenceEqualNegatableOperator notExpr= precedenceBitwiseOrExpression ) -> ^( KW_NOT ^( precedenceEqualNegatableOperator $precedenceEqualExpression $notExpr) ) | ( precedenceEqualOperator equalExpr= precedenceBitwiseOrExpression ) -> ^( precedenceEqualOperator $precedenceEqualExpression $equalExpr) | ( KW_NOT KW_IN LPAREN KW_SELECT )=> ( KW_NOT KW_IN subQueryExpression ) -> ^( KW_NOT ^( TOK_SUBQUERY_EXPR ^( TOK_SUBQUERY_OP KW_IN ) subQueryExpression $precedenceEqualExpression) ) | ( KW_NOT KW_IN expressions ) -> ^( KW_NOT ^( TOK_FUNCTION KW_IN $precedenceEqualExpression expressions ) ) | ( KW_IN LPAREN KW_SELECT )=> ( KW_IN subQueryExpression ) -> ^( TOK_SUBQUERY_EXPR ^( TOK_SUBQUERY_OP KW_IN ) subQueryExpression $precedenceEqualExpression) | ( KW_IN expressions ) -> ^( TOK_FUNCTION KW_IN $precedenceEqualExpression expressions ) | ( KW_NOT KW_BETWEEN (min= precedenceBitwiseOrExpression ) KW_AND (max= precedenceBitwiseOrExpression ) ) -> ^( TOK_FUNCTION Identifier["between"] KW_TRUE $left $min $max) | ( KW_BETWEEN (min= precedenceBitwiseOrExpression ) KW_AND (max= precedenceBitwiseOrExpression ) ) -> ^( TOK_FUNCTION Identifier["between"] KW_FALSE $left $min $max) )* | ( KW_EXISTS LPAREN KW_SELECT )=> ( KW_EXISTS subQueryExpression ) -> ^( TOK_SUBQUERY_EXPR ^( TOK_SUBQUERY_OP KW_EXISTS ) subQueryExpression ) );]) at org.antlr.runtime.DFA.noViableAlt(DFA.java:158) at org.antlr.runtime.DFA.predict(DFA.java:116) at org.apache.hadoop.hive.ql.parse.HiveParser_IdentifiersParser.precedenceEqualExpression(HiveParser_IdentifiersParser.java:8651) at org.apache.hadoop.hive.ql.parse.HiveParser_IdentifiersParser.precedenceNotExpression(HiveParser_IdentifiersParser.java:9673) at org.apache.hadoop.hive.ql.parse.HiveParser_IdentifiersParser.precedenceAndExpression(HiveParser_IdentifiersParser.java:9792) at org.apache.hadoop.hive.ql.parse.HiveParser_IdentifiersParser.precedenceOrExpression(HiveParser_IdentifiersParser.java:9951) at org.apache.hadoop.hive.ql.parse.HiveParser_IdentifiersParser.expression(HiveParser_IdentifiersParser.java:6567) at org.apache.hadoop.hive.ql.parse.HiveParser_IdentifiersParser.atomExpression(HiveParser_IdentifiersParser.java:6791) at org.apache.hadoop.hive.ql.parse.HiveParser_IdentifiersParser.precedenceFieldExpression(HiveParser_IdentifiersParser.java:6862) at org.apache.hadoop.hive.ql.parse.HiveParser_IdentifiersParser.precedenceUnaryPrefixExpression(HiveParser_IdentifiersParser.java:7247) at org.apache.hadoop.hive.ql.parse.HiveParser_IdentifiersParser.precedenceUnarySuffixExpression(HiveParser_IdentifiersParser.java:7307) at org.apache.hadoop.hive.ql.parse.HiveParser_IdentifiersParser.precedenceBitwiseXorExpression(HiveParser_IdentifiersParser.java:7491) at org.apache.hadoop.hive.ql.parse.HiveParser_IdentifiersParser.precedenceStarExpression(HiveParser_IdentifiersParser.java:7651) at org.apache.hadoop.hive.ql.parse.HiveParser_IdentifiersParser.precedencePlusExpression(HiveParser_IdentifiersParser.java:7811) at org.apache.hadoop.hive.ql.parse.HiveParser.precedencePlusExpression(HiveParser.java:44550) at org.apache.hadoop.hive.ql.parse.HiveParser.columnAssignmentClause(HiveParser.java:44206) at org.apache.hadoop.hive.ql.parse.HiveParser.setColumnsClause(HiveParser.java:44271) at org.apache.hadoop.hive.ql.parse.HiveParser.updateStatement(HiveParser.java:44417) at org.apache.hadoop.hive.ql.parse.HiveParser.execStatement(HiveParser.java:1616) at org.apache.hadoop.hive.ql.parse.HiveParser.statement(HiveParser.java:1062) at org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:201) at org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:166) at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:404) at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:305) at org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:1119) at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1167) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1055) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1045) at org.apache.hadoop.hive.cli.CliDriver.processLocalCmd(CliDriver.java:207) at org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:159) at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:370) at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:305) at org.apache.hadoop.hive.cli.CliDriver.executeDriver(CliDriver.java:702) at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:675) at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:615) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.util.RunJar.run(RunJar.java:221) at org.apache.hadoop.util.RunJar.main(RunJar.java:136) FAILED: ParseException line 1:33 cannot recognize input near 'select' 'lastname' 'from' in expression specification

我认为它不支持子查询。相同的语句适用于常量。

hive -e "set hive.auto.convert.join.noconditionaltask.size = 10000000; set hive.support.concurrency = true; set hive.enforce.bucketing = true; set hive.exec.dynamic.partition.mode = nonstrict; set hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; set hive.compactor.initiator.on = true;
set hive.compactor.worker.threads = 1 ; UPDATE person20 SET lastname = 'PP' WHERE  persid = 0;"

--这条语句更新记录成功

你能帮我找到在 HIVE 中执行 DML/Merge 操作的最佳策略吗?

你可以通过蛮力做到这一点:

  • 重新创建 table person20 但不是 ACID,根据虚拟列名称进行分区,并使用单个分区 'dummy'
  • 填充 person20person21
  • 创建工作 table tmpperson20 具有与 person20
  • 完全相同的结构和相同的 'dummy' 分区
  • INSERT INTO tmpperson20 PARTITION (dummy='dummy') SELECT p20.persid, p21.lastname, ... FROM person20 p20 JOIN person21 p21 ON p20.persid=p21.persid
  • INSERT INTO tmpperson20 PARTITION (dummy='dummy') SELECT * FROM person20 p20 WHERE NOT EXISTS (select p21.persid FROM person21 p21 WHERE p20.persid=p21.persid)
  • ALTER TABLE person20 DROP PARTITION (dummy='dummy')
  • ALTER TABLE person20 EXCHANGE PARTITION (dummy='dummy') WITH tmpperson20
  • 现在你可以放弃 tmpperson20

不过,由于分桶,使用 ACID table 可能会更棘手。


您也可以尝试使用一种在游标上迭代并在循环中应用单个更新的过程语言。对于大量更新来说效率很低...

HPL/SQL 实用程序随 Hive 2.x 一起提供,可能会安装在 Hive 1.x 之上,但我从未有机会尝试过。而且 Oracle 方言在 Hive 上感觉很奇怪...!

或者您可以在循环中使用 JDBC ResultSetPreparedStatement 开发一些自定义 Java 代码。