如何在读取 spark 中的 cassandra 分区时获得良好的性能?

How to get good performance on reading cassandra partitions in spark?

我正在使用 cassandra-connector.I 尝试以下读取解决方案从 cassandra 分区读取数据以激发 partitions.I 尝试通过尽可能多地创建 rdds 来并行化任务,但解决方案 ONE 和解决方案两人表现相同。

在解决方案一中,我可以立即看到 spark UI 中的阶段。我试图在解决方案二中避免一个 for 循环。

在解决方案二中,阶段出现在大量 time.Also 之后,随着用户 ID 数量的增加,然后在阶段出现在火花 UI 中之前的时间显着增加,解决方案二。

Version
 spark - 1.1
 Dse - 4.6
 cassandra-connector -1.1

Setup
 3 - Nodes with spark cassandra
 Each node has 1 core dedicated to this task.
 512MB ram for the executor memory.

我的 cassandra Table 模式,

 CREATE TABLE   test (
   user text,
   userid bigint,
   period timestamp,
   ip text,
   data blob,
   PRIMARY KEY((user,userid,period),ip)
   );

第一个解决方案:

 val users = List("u1","u2","u3")
 val period = List("2000-05-01","2000-05-01")
 val partitions = users.flatMap(x => period.map(y => (x,y))))
 val userids = 1 to 10
 for (userid <- userids){
 val rdds = partitions.map(x => sc.cassandraTable("test_keyspace","table1")
                                .select("data")
                                .where("user=?", x._1)
                                .where("period=?",x._2)
                                .where("userid=?,userid)
                          )
 val combinedRdd = sc.union(rdds)
 val result = combinedRdd.map(getDataFromColumns)
                    .coalesce(4)
                    .reduceByKey((x,y) => x+y)
                    .collect()
 result.foreach(prinltn)
 }

第二个解决方案:

 val users = List("u1","u2","u3")
 val period = List("2000-05-01","2000-05-01")
 val userids = 1 to 10
 val partitions = users.flatMap(x => period.flatMap(
                  y => userids.map(z => (x,y,z))))

 val rdds = partitions.map(x => sc.cassandraTable("test_keyspace","table1")
                                .select("data")
                                .where("user=?", x._1)
                                .where("period=?",x._2)
                                .where("userid=?,x._3)
                     )
 val combinedRdd = sc.union(rdds)
 val result = combinedRdd.map(getDataFromColumns)
                    .coalesce(4)
                    .reduceByKey((x,y) => x+y)
                    .collect()
 result.foreach(prinltn)

为什么解决方案二不比解决方案一快?

我的理解是,由于所有分区都是一次性查询的,并且数据是跨节点分布的,因此应该更快。 如有不妥请指正

首先,您应该查看 joinWithCassandraTable,这对于您正在做的事情来说应该更容易 api(前提是您有足够的分区使其值得)。这个 api 采用分区键的 RDD,并从 C* 中定性和分发它们的检索。

这会让你做类似的事情

sc.parallelize(partitions).joinWithCassandraTable("keyspace","table")

如果您愿意,也可以执行 repartitionByCassandraReplica 但这很可能对非常小的请求没有好处。您必须对数据进行基准测试才能确定。

如果你只想执行原始驱动程序命令,你可以执行类似

的操作
val cc = CassandraConnector(sc.getConf)
partitions.mapPartitions{ it => 
  cc.withSessionDo{ session =>
    session.execute( Some query )
  }
}

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md#performing-efficient-joins-with-cassandra-tables-since-12

代码示例演练:

现在让我们快速浏览一下您的代码示例 首先,我们只检索 60 个 C* 分区。对于此用例,与从 C* 检索分区所需的时间相比,我们很可能会考虑设置和取消任务所需的时间。

在这两种解决方案中,由于 Spark 的惰性求值,您基本上都在做同样的事情。驱动程序创建一个图形,首先创建 60 个 RDD,每个 RDD 使用惰性指令从 C* 检索单个分区。 (每个分区 1 个 RDD 是不好的,RDD 旨在存储大量数据,因此最终会产生大量开销)。尽管 60 个 RDD 是用不同的模式制作的,但这并不重要,因为它们的实际计算不会发生,直到您调用 collect。驱动程序继续设置新的 RDD 和转换。

在我们点击收集之前,绝对没有任何事情可以从 C* 中检索数据,因为我们点击收集时,对于您在上面发布的两种解决方案,使用基本上相同的依赖关系图,所以在这两种情况下都会发生完全(或非常相似)的事情。所有 60 个 RDD 都将按照依赖图指定的方式解析。这将并行发生,但同样会占用大量开销。

为了避免这种情况,请查看我上面使用单个 RDD 提取所有信息的示例。