Spark joinWithCassandraTable() on map multiple partition key ERROR

Spark joinWithCassandraTable() on map multiple partition key ERROR

我正在尝试使用以下方法对巨大的 Cassandra table 的一小部分进行过滤:

val snapshotsFiltered = sc.parallelize(startDate to endDate).map(TableKey(_2)).joinWithCassandraTable("listener","snapshots_test_b")

我想将 cassandra table 中的行映射到属于分区键的 'created' 列。

我的table键(table的分区键)定义为:

case class TableKey(imei: String, created: Long, when: Long)

结果出错:

[error] /home/ubuntu/scala/test/test.scala:61: not enough arguments for method apply: (imei: String, created: Long)test.TableKey in object TableKey. [error] Unspecified value parameter created. [error] val snapshotsFiltered = sc.parallelize(startDate to endDate).map(TableKey(_2)).joinWithCassandraTable("listener","snapshots_test_b") [error] ^ [error] one error found [error] (compile:compile) Compilation failed

Documentation.

中一样,它仅与分区键中的一个对象一起工作

为什么多个分区键有问题?- 已回答。

编辑:我尝试以正确的形式使用 joinWithCassandraTable:

val snapshotsFiltered = sc.parallelize(startDate to endDate).map(TableKey("*",_,startDate)).joinWithCassandraTable("listener","snapshots_test_c")

当我尝试在 Spark 上 运行 它时没有错误,但它永远卡在“[stage 0:> (0+2)/2]”...

出了什么问题?

错误告诉您 class TableKey 需要 3 个组件来初始化,但只传递了一个参数。这是一个 Scala 编译错误,与 C* 或 Spark 无关。

 val snapshotsFiltered = sc.parallelize(startDate to endDate)
   .map(TableKey(_2))  /// Table Key does not have a single element constructor so this will fail
   .joinWithCassandraTable("listener","snapshots_test_b")

但一般来说,C* 使用整个 partition key 来确定特定行所在的位置。因此,只有知道整个 partition key 才能有效地提取数据,因此只传递其中的一部分没有任何价值。

joinWithCassandraTable 需要完整的 partition key 值,以便它可以有效地完成它的工作。如果您只有 parition key 的一部分,您将需要执行完整的 table 扫描并使用 Spark 进行过滤。

如果您只想根据 clustering column 进行过滤,您可以通过将 where 子句下推到 C* 来实现,例如

sc.cassandraTable("ks","test").where("clustering_key > someValue")