joinWithCassandraTable 在 table 大小增长时变得更慢
joinWithCassandraTable getting much slower on growing table size
我目前正在使用这个堆栈:
- Cassandra 2.2(多节点)
- Spark/Streaming 1.4.1
- Spark-Cassandra-连接器 1.4.0-M3
我有这个 DStream[Ids],其中 RDD 计数大约 6000-7000 个元素。 id
是分区键。
val ids: DStream[Ids] = ...
ids.joinWithCassandraTable(keyspace, tableName, joinColumns = SomeColumns("id"))
随着 tableName
变大,假设大约 30k "rows",查询需要更长的时间,而且我无法保持在批持续时间阈值以下。它的性能类似于使用大量 IN
-子句,据我所知这是不可取的。
有没有更有效的方法?
回答:
在与 Cassandra 进行连接之前,请始终记住使用 repartitionByCassandraReplica
对本地 RDD 重新分区,以确保每个分区仅针对本地 Cassandra 节点工作。在我的例子中,我还必须增加加入本地 RDD/DStream 的分区,以便任务在工作人员之间均匀分布。
"id" 是您 table 中的分区键吗?如果不是,我认为它需要,否则你可能正在做一个 table 扫描,随着 table 变大,它会 运行 逐渐变慢。
此外,为了使用此方法获得良好的性能,我相信您需要在您的 ids RDD 上使用 repartitionByCassandraReplica() 操作,以便连接是每个节点上的本地操作。
见this。
我目前正在使用这个堆栈:
- Cassandra 2.2(多节点)
- Spark/Streaming 1.4.1
- Spark-Cassandra-连接器 1.4.0-M3
我有这个 DStream[Ids],其中 RDD 计数大约 6000-7000 个元素。 id
是分区键。
val ids: DStream[Ids] = ...
ids.joinWithCassandraTable(keyspace, tableName, joinColumns = SomeColumns("id"))
随着 tableName
变大,假设大约 30k "rows",查询需要更长的时间,而且我无法保持在批持续时间阈值以下。它的性能类似于使用大量 IN
-子句,据我所知这是不可取的。
有没有更有效的方法?
回答:
在与 Cassandra 进行连接之前,请始终记住使用 repartitionByCassandraReplica
对本地 RDD 重新分区,以确保每个分区仅针对本地 Cassandra 节点工作。在我的例子中,我还必须增加加入本地 RDD/DStream 的分区,以便任务在工作人员之间均匀分布。
"id" 是您 table 中的分区键吗?如果不是,我认为它需要,否则你可能正在做一个 table 扫描,随着 table 变大,它会 运行 逐渐变慢。
此外,为了使用此方法获得良好的性能,我相信您需要在您的 ids RDD 上使用 repartitionByCassandraReplica() 操作,以便连接是每个节点上的本地操作。
见this。