Spark joinWithCassandraTable() on map multiple partition key ERROR
Spark joinWithCassandraTable() on map multiple partition key ERROR
我正在尝试使用以下方法对巨大的 Cassandra table 的一小部分进行过滤:
val snapshotsFiltered = sc.parallelize(startDate to endDate).map(TableKey(_2)).joinWithCassandraTable("listener","snapshots_test_b")
我想将 cassandra table 中的行映射到属于分区键的 'created' 列。
我的table键(table的分区键)定义为:
case class TableKey(imei: String, created: Long, when: Long)
结果出错:
[error] /home/ubuntu/scala/test/test.scala:61: not enough arguments for method apply: (imei: String, created: Long)test.TableKey in object TableKey.
[error] Unspecified value parameter created.
[error] val snapshotsFiltered = sc.parallelize(startDate to endDate).map(TableKey(_2)).joinWithCassandraTable("listener","snapshots_test_b")
[error] ^
[error] one error found
[error] (compile:compile) Compilation failed
中一样,它仅与分区键中的一个对象一起工作
为什么多个分区键有问题?- 已回答。
编辑:我尝试以正确的形式使用 joinWithCassandraTable:
val snapshotsFiltered = sc.parallelize(startDate to endDate).map(TableKey("*",_,startDate)).joinWithCassandraTable("listener","snapshots_test_c")
当我尝试在 Spark 上 运行 它时没有错误,但它永远卡在“[stage 0:> (0+2)/2]”...
出了什么问题?
错误告诉您 class TableKey
需要 3 个组件来初始化,但只传递了一个参数。这是一个 Scala 编译错误,与 C* 或 Spark 无关。
val snapshotsFiltered = sc.parallelize(startDate to endDate)
.map(TableKey(_2)) /// Table Key does not have a single element constructor so this will fail
.joinWithCassandraTable("listener","snapshots_test_b")
但一般来说,C* 使用整个 partition key
来确定特定行所在的位置。因此,只有知道整个 partition key
才能有效地提取数据,因此只传递其中的一部分没有任何价值。
joinWithCassandraTable 需要完整的 partition key
值,以便它可以有效地完成它的工作。如果您只有 parition key
的一部分,您将需要执行完整的 table 扫描并使用 Spark 进行过滤。
如果您只想根据 clustering column
进行过滤,您可以通过将 where
子句下推到 C* 来实现,例如
sc.cassandraTable("ks","test").where("clustering_key > someValue")
我正在尝试使用以下方法对巨大的 Cassandra table 的一小部分进行过滤:
val snapshotsFiltered = sc.parallelize(startDate to endDate).map(TableKey(_2)).joinWithCassandraTable("listener","snapshots_test_b")
我想将 cassandra table 中的行映射到属于分区键的 'created' 列。
我的table键(table的分区键)定义为:
case class TableKey(imei: String, created: Long, when: Long)
结果出错:
中一样,它仅与分区键中的一个对象一起工作[error] /home/ubuntu/scala/test/test.scala:61: not enough arguments for method apply: (imei: String, created: Long)test.TableKey in object TableKey. [error] Unspecified value parameter created. [error] val snapshotsFiltered = sc.parallelize(startDate to endDate).map(TableKey(_2)).joinWithCassandraTable("listener","snapshots_test_b") [error] ^ [error] one error found [error] (compile:compile) Compilation failed
为什么多个分区键有问题?- 已回答。
编辑:我尝试以正确的形式使用 joinWithCassandraTable:
val snapshotsFiltered = sc.parallelize(startDate to endDate).map(TableKey("*",_,startDate)).joinWithCassandraTable("listener","snapshots_test_c")
当我尝试在 Spark 上 运行 它时没有错误,但它永远卡在“[stage 0:> (0+2)/2]”...
出了什么问题?
错误告诉您 class TableKey
需要 3 个组件来初始化,但只传递了一个参数。这是一个 Scala 编译错误,与 C* 或 Spark 无关。
val snapshotsFiltered = sc.parallelize(startDate to endDate)
.map(TableKey(_2)) /// Table Key does not have a single element constructor so this will fail
.joinWithCassandraTable("listener","snapshots_test_b")
但一般来说,C* 使用整个 partition key
来确定特定行所在的位置。因此,只有知道整个 partition key
才能有效地提取数据,因此只传递其中的一部分没有任何价值。
joinWithCassandraTable 需要完整的 partition key
值,以便它可以有效地完成它的工作。如果您只有 parition key
的一部分,您将需要执行完整的 table 扫描并使用 Spark 进行过滤。
如果您只想根据 clustering column
进行过滤,您可以通过将 where
子句下推到 C* 来实现,例如
sc.cassandraTable("ks","test").where("clustering_key > someValue")