Spark Scala Cassandra 连接器删除所有所有行失败,IllegalArgumentException 要求失败异常
Spark Scala Cassandra connector delete all all rows is failing with IllegalArgumentException requirement failed Exception
创建table-
CREATE TABLE test.word_groups (group text, word text, count int,PRIMARY KEY (group,word));
插入数据-
INSERT INTO test.word_groups (group , word , count ) VALUES ( 'A-group', 'raj', 0) ;
INSERT INTO test.word_groups (group , word , count ) VALUES ( 'b-group', 'jaj', 0) ;
INSERT INTO test.word_groups (group , word , count ) VALUES ( 'A-group', 'raff', 3) ;
SELECT * FROM word_groups ;
group | word | count
---------+------+-------
b-group | jaj | 0
A-group | raff | 3
A-group | raj | 0
脚本-
val cassandraUrl = "org.apache.spark.sql.cassandra"
val wordGroup: Map[String, String] = Map("table" ->"word_groups",
"keyspace" -> "test", "cluster" -> "test-cluster")
val groupData = {spark.read.format(cassandraUrl).options(wordGroup).load()
.where(col("group") === "b-group")}
groupData.rdd.deleteFromCassandra("sunbird_courses", "word_groups")
异常-
java.lang.IllegalArgumentException: requirement failed: Invalid row size: 3 instead of 2.
at scala.Predef$.require(Predef.scala:224)
at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:23)
at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:12)
at com.datastax.spark.connector.writer.BoundStatementBuilder.bind(BoundStatementBuilder.scala:102)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:105)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:30)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:30)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal.apply(TableWriter.scala:229)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal.apply(TableWriter.scala:198)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo.apply(CassandraConnector.scala:112)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo.apply(CassandraConnector.scala:111)
at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:129)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:198)
at com.datastax.spark.connector.writer.TableWriter.delete(TableWriter.scala:194)
at com.datastax.spark.connector.RDDFunctions$$anonfun$deleteFromCassandra.apply(RDDFunctions.scala:119)
at com.datastax.spark.connector.RDDFunctions$$anonfun$deleteFromCassandra.apply(RDDFunctions.scala:119)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
21/08/11 09:01:24 WARN TaskSetManager: Lost task 0.0 in stage 11.0 (TID 2953, localhost, executor driver): java.lang.IllegalArgumentException: requirement failed: Invalid row size: 3 instead of 2.
Spark 版本 - 2.4.4 和
Spark Cassandra 连接器版本 - 2.5.0
Spark Cassandra 连接器文档 link - https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md#deleting-rows-and-columns
我正在尝试删除这些列的所有记录,包括主键。
是否有任何解决方法?
仅供参考 - 我需要从 word_groups table 中删除组“A-group”的所有记录,包括主键/分区键
这是 2.5.x 中的一个有趣的变化,我没有意识到这一点 - 你现在需要一个正确的行大小,即使指定了 keyColumns
,它以前没有它也能工作 -对我来说看起来像是一个错误。
删除整行时只需要保留主键 - 将删除更改为:
groupData.select("group", "word").rdd.deleteFromCassandra("test", "word_groups")
但在您的情况下,最好根据分区键列删除 - 在这种情况下,您将只有一个墓碑(您仍然需要 select 只有必要的列):
import com.datastax.spark.connector._
{groupData.select("group").rdd
.deleteFromCassandra("test", "word_groups", keyColumns = SomeColumns("group"))}
而且您甚至不需要从 Cassandra 读取输入数据 - 如果您知道分区键的值,那么您只需创建 RDD 并删除数据(类似于 doc 中所示):
case class Key (group:String)
{ sc.parallelize(Seq(Key("b-group")))
.deleteFromCassandra("test", "word_groups", keyColumns = SomeColumns("group"))}
创建table-
CREATE TABLE test.word_groups (group text, word text, count int,PRIMARY KEY (group,word));
插入数据-
INSERT INTO test.word_groups (group , word , count ) VALUES ( 'A-group', 'raj', 0) ;
INSERT INTO test.word_groups (group , word , count ) VALUES ( 'b-group', 'jaj', 0) ;
INSERT INTO test.word_groups (group , word , count ) VALUES ( 'A-group', 'raff', 3) ;
SELECT * FROM word_groups ;
group | word | count
---------+------+-------
b-group | jaj | 0
A-group | raff | 3
A-group | raj | 0
脚本-
val cassandraUrl = "org.apache.spark.sql.cassandra"
val wordGroup: Map[String, String] = Map("table" ->"word_groups",
"keyspace" -> "test", "cluster" -> "test-cluster")
val groupData = {spark.read.format(cassandraUrl).options(wordGroup).load()
.where(col("group") === "b-group")}
groupData.rdd.deleteFromCassandra("sunbird_courses", "word_groups")
异常-
java.lang.IllegalArgumentException: requirement failed: Invalid row size: 3 instead of 2.
at scala.Predef$.require(Predef.scala:224)
at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:23)
at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:12)
at com.datastax.spark.connector.writer.BoundStatementBuilder.bind(BoundStatementBuilder.scala:102)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:105)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:30)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:30)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal.apply(TableWriter.scala:229)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal.apply(TableWriter.scala:198)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo.apply(CassandraConnector.scala:112)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo.apply(CassandraConnector.scala:111)
at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:129)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:198)
at com.datastax.spark.connector.writer.TableWriter.delete(TableWriter.scala:194)
at com.datastax.spark.connector.RDDFunctions$$anonfun$deleteFromCassandra.apply(RDDFunctions.scala:119)
at com.datastax.spark.connector.RDDFunctions$$anonfun$deleteFromCassandra.apply(RDDFunctions.scala:119)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
21/08/11 09:01:24 WARN TaskSetManager: Lost task 0.0 in stage 11.0 (TID 2953, localhost, executor driver): java.lang.IllegalArgumentException: requirement failed: Invalid row size: 3 instead of 2.
Spark 版本 - 2.4.4 和 Spark Cassandra 连接器版本 - 2.5.0
Spark Cassandra 连接器文档 link - https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md#deleting-rows-and-columns
我正在尝试删除这些列的所有记录,包括主键。
是否有任何解决方法?
仅供参考 - 我需要从 word_groups table 中删除组“A-group”的所有记录,包括主键/分区键
这是 2.5.x 中的一个有趣的变化,我没有意识到这一点 - 你现在需要一个正确的行大小,即使指定了 keyColumns
,它以前没有它也能工作 -对我来说看起来像是一个错误。
删除整行时只需要保留主键 - 将删除更改为:
groupData.select("group", "word").rdd.deleteFromCassandra("test", "word_groups")
但在您的情况下,最好根据分区键列删除 - 在这种情况下,您将只有一个墓碑(您仍然需要 select 只有必要的列):
import com.datastax.spark.connector._
{groupData.select("group").rdd
.deleteFromCassandra("test", "word_groups", keyColumns = SomeColumns("group"))}
而且您甚至不需要从 Cassandra 读取输入数据 - 如果您知道分区键的值,那么您只需创建 RDD 并删除数据(类似于 doc 中所示):
case class Key (group:String)
{ sc.parallelize(Seq(Key("b-group")))
.deleteFromCassandra("test", "word_groups", keyColumns = SomeColumns("group"))}