在尝试使用“TABLE_NAME.insert”将数据插入分区 table 时获取“单分区插入语句中的错误分区元组”

Getting `Mispartitioned tuple in single-partition insert statement` while trying to insert data into a partitioned table using `TABLE_NAME.insert`

我正在使用给定的插入语句

创建一个 VoltDB table
CREATE TABLE EMPLOYEE (
    ID VARCHAR(4) NOT NULL,
    CODE VARCHAR(4) NOT NULL,
    FIRST_NAME VARCHAR(30) NOT NULL,
    LAST_NAME VARCHAR(30) NOT NULL,
    PRIMARY KEY (ID, CODE)
);

并将 table 与

分区
PARTITION TABLE EMPLOYEE ON COLUMN ID;

我已经编写了一个 spark 作业来将数据插入 VoltDB,我正在使用下面的 scala 代码将记录插入 VoltDB,如果我们不对 table

进行分区,代码运行良好
import org.voltdb._;
import org.voltdb.client._;
import scala.collection.JavaConverters._

val voltClient:Client = ClientFactory.createClient();
voltClient.createConnection("IP:PORT");

val empDf = spark.read.format("csv")
          .option("inferSchema", "true")
          .option("header", "true")
          .option("sep", ",")
          .load("/FileStore/tables/employee.csv")

// Code to convert scala seq to java varargs
def callProcedure(procName: String, parameters: Any*): ClientResponse =
    voltClient.callProcedure(procName, paramsToJavaObjects(parameters: _*): _*)

def paramsToJavaObjects(params: Any*) = params.map { param ⇒
    val value = param match {
      case None    ⇒ null
      case Some(v) ⇒ v
      case _       ⇒ param
    }
    value.asInstanceOf[AnyRef]
}

empDf.collect().foreach { row =>
  callProcedure("EMPLOYEE.insert", row.toSeq:_*);
}

但是如果我对 table

进行分区,我会遇到以下错误
Mispartitioned tuple in single-partition insert statement.
Constraint Type PARTITIONING, Table CatalogId EMPLOYEE
Relevant Tuples:
ID  CODE  FIRST_NAME  LAST_NAME 
--- ----- ----------- ----------
1   CD01  Naresh       "Joshi"
    at org.voltdb.client.ClientImpl.internalSyncCallProcedure(ClientImpl.java:485)
    at org.voltdb.client.ClientImpl.callProcedureWithClientTimeout(ClientImpl.java:324)
    at org.voltdb.client.ClientImpl.callProcedure(ClientImpl.java:260)
    at line4c569b049a9d4e51a3e8fda7cbb043de32.$read$$iw$$iw$$iw$$iw$$iw$$iw.callProcedure(command-3986740264398828:9)
    at line4c569b049a9d4e51a3e8fda7cbb043de40.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun.apply(command-3986740264399793:8)
    at line4c569b049a9d4e51a3e8fda7cbb043de40.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun.apply(command-3986740264399793:7)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)

我发现 link (https://forum.voltdb.com/forum/voltdb-discussions/building-voltdb-applications/1182-mispartitioned-tuple-in-single-partition-insert-statement) 与问题有关,并尝试使用以下查询对过程进行分区

PARTITION PROCEDURE EMPLOYEE.insert ON TABLE EMPLOYEE COLUMN ID;

PARTITION PROCEDURE EMPLOYEE.insert ON TABLE EMPLOYEE COLUMN ID [PARAMETER 0];

但是我在执行这些语句时遇到 [Ad Hoc DDL Input]: VoltDB DDL Error: "Partition references an undefined procedure "EMPLOYEE.insert"" 错误。

但是,我可以使用 @AdHoc 存储过程插入数据,但我无法找出问题或解决上述场景的问题,我正在使用 EMPLOYEE.insert 存储过程将数据插入分区 table.

过程 "EMPLOYEE.insert" 就是所谓的 "default" 过程,它是在您创建 table EMPLOYEE 时由 VoltDB 自动生成的。它已经根据 table 的分区自动分区,因此您不能调用 "PARTITION PROCEDURE EMPLOYEE.insert ..." 来覆盖它。

我认为正在发生的事情是程序按 ID 列分区,在 EMPLOYEE table 中该列是 VARCHAR。因此输入参数应该是一个字符串。但是,我认为您的代码以某种方式读取 CSV 文件并将第一列作为 int 值传递。

java 客户端 callProcedure(String procedureName, Object... params) 方法接受可变参数作为参数。这可以是任何 Object[]。服务器上某处会进行检查,其中参数的# 必须与过程所期望的# 匹配,否则过程调用将被拒绝,并且永远不会执行。但是,我认为在您的情况下,# of arguments 是可以的,因此它会尝试执行该过程。它散列对应于 ID 的第一个参数值,然后确定应该转到哪个分区。调用被路由到该分区以供执行。当它执行时,它会尝试插入这些值,但是会再次检查该分区的分区键值是否正确,但失败了。

我认为如果值作为 int 传入,它会散列到错误的分区。然后在该分区中,它尝试将值插入到列中,这是一个 VARCHAR,因此它可能将 int 隐式转换为 String,但它不在正确的分区中,因此插入失败并出现此错误 "Mispartitioned tuple in single-partition insert statement."如果您编写 java 存储过程并将错误的列配置为分区键,则会出现相同的错误。

披露:我在 VoltDB 工作。