如何在读取 spark 中的 cassandra 分区时获得良好的性能?
How to get good performance on reading cassandra partitions in spark?
我正在使用 cassandra-connector.I 尝试以下读取解决方案从 cassandra 分区读取数据以激发 partitions.I 尝试通过尽可能多地创建 rdds 来并行化任务,但解决方案 ONE 和解决方案两人表现相同。
在解决方案一中,我可以立即看到 spark UI 中的阶段。我试图在解决方案二中避免一个 for 循环。
在解决方案二中,阶段出现在大量 time.Also 之后,随着用户 ID 数量的增加,然后在阶段出现在火花 UI 中之前的时间显着增加,解决方案二。
Version
spark - 1.1
Dse - 4.6
cassandra-connector -1.1
Setup
3 - Nodes with spark cassandra
Each node has 1 core dedicated to this task.
512MB ram for the executor memory.
我的 cassandra Table 模式,
CREATE TABLE test (
user text,
userid bigint,
period timestamp,
ip text,
data blob,
PRIMARY KEY((user,userid,period),ip)
);
第一个解决方案:
val users = List("u1","u2","u3")
val period = List("2000-05-01","2000-05-01")
val partitions = users.flatMap(x => period.map(y => (x,y))))
val userids = 1 to 10
for (userid <- userids){
val rdds = partitions.map(x => sc.cassandraTable("test_keyspace","table1")
.select("data")
.where("user=?", x._1)
.where("period=?",x._2)
.where("userid=?,userid)
)
val combinedRdd = sc.union(rdds)
val result = combinedRdd.map(getDataFromColumns)
.coalesce(4)
.reduceByKey((x,y) => x+y)
.collect()
result.foreach(prinltn)
}
第二个解决方案:
val users = List("u1","u2","u3")
val period = List("2000-05-01","2000-05-01")
val userids = 1 to 10
val partitions = users.flatMap(x => period.flatMap(
y => userids.map(z => (x,y,z))))
val rdds = partitions.map(x => sc.cassandraTable("test_keyspace","table1")
.select("data")
.where("user=?", x._1)
.where("period=?",x._2)
.where("userid=?,x._3)
)
val combinedRdd = sc.union(rdds)
val result = combinedRdd.map(getDataFromColumns)
.coalesce(4)
.reduceByKey((x,y) => x+y)
.collect()
result.foreach(prinltn)
为什么解决方案二不比解决方案一快?
我的理解是,由于所有分区都是一次性查询的,并且数据是跨节点分布的,因此应该更快。
如有不妥请指正
首先,您应该查看 joinWithCassandraTable,这对于您正在做的事情来说应该更容易 api(前提是您有足够的分区使其值得)。这个 api 采用分区键的 RDD,并从 C* 中定性和分发它们的检索。
这会让你做类似的事情
sc.parallelize(partitions).joinWithCassandraTable("keyspace","table")
如果您愿意,也可以执行 repartitionByCassandraReplica
但这很可能对非常小的请求没有好处。您必须对数据进行基准测试才能确定。
如果你只想执行原始驱动程序命令,你可以执行类似
的操作
val cc = CassandraConnector(sc.getConf)
partitions.mapPartitions{ it =>
cc.withSessionDo{ session =>
session.execute( Some query )
}
}
代码示例演练:
现在让我们快速浏览一下您的代码示例
首先,我们只检索 60 个 C* 分区。对于此用例,与从 C* 检索分区所需的时间相比,我们很可能会考虑设置和取消任务所需的时间。
在这两种解决方案中,由于 Spark 的惰性求值,您基本上都在做同样的事情。驱动程序创建一个图形,首先创建 60 个 RDD,每个 RDD 使用惰性指令从 C* 检索单个分区。 (每个分区 1 个 RDD 是不好的,RDD 旨在存储大量数据,因此最终会产生大量开销)。尽管 60 个 RDD 是用不同的模式制作的,但这并不重要,因为它们的实际计算不会发生,直到您调用 collect。驱动程序继续设置新的 RDD 和转换。
在我们点击收集之前,绝对没有任何事情可以从 C* 中检索数据,因为我们点击收集时,对于您在上面发布的两种解决方案,使用基本上相同的依赖关系图,所以在这两种情况下都会发生完全(或非常相似)的事情。所有 60 个 RDD 都将按照依赖图指定的方式解析。这将并行发生,但同样会占用大量开销。
为了避免这种情况,请查看我上面使用单个 RDD 提取所有信息的示例。
我正在使用 cassandra-connector.I 尝试以下读取解决方案从 cassandra 分区读取数据以激发 partitions.I 尝试通过尽可能多地创建 rdds 来并行化任务,但解决方案 ONE 和解决方案两人表现相同。
在解决方案一中,我可以立即看到 spark UI 中的阶段。我试图在解决方案二中避免一个 for 循环。
在解决方案二中,阶段出现在大量 time.Also 之后,随着用户 ID 数量的增加,然后在阶段出现在火花 UI 中之前的时间显着增加,解决方案二。
Version
spark - 1.1
Dse - 4.6
cassandra-connector -1.1
Setup
3 - Nodes with spark cassandra
Each node has 1 core dedicated to this task.
512MB ram for the executor memory.
我的 cassandra Table 模式,
CREATE TABLE test (
user text,
userid bigint,
period timestamp,
ip text,
data blob,
PRIMARY KEY((user,userid,period),ip)
);
第一个解决方案:
val users = List("u1","u2","u3")
val period = List("2000-05-01","2000-05-01")
val partitions = users.flatMap(x => period.map(y => (x,y))))
val userids = 1 to 10
for (userid <- userids){
val rdds = partitions.map(x => sc.cassandraTable("test_keyspace","table1")
.select("data")
.where("user=?", x._1)
.where("period=?",x._2)
.where("userid=?,userid)
)
val combinedRdd = sc.union(rdds)
val result = combinedRdd.map(getDataFromColumns)
.coalesce(4)
.reduceByKey((x,y) => x+y)
.collect()
result.foreach(prinltn)
}
第二个解决方案:
val users = List("u1","u2","u3")
val period = List("2000-05-01","2000-05-01")
val userids = 1 to 10
val partitions = users.flatMap(x => period.flatMap(
y => userids.map(z => (x,y,z))))
val rdds = partitions.map(x => sc.cassandraTable("test_keyspace","table1")
.select("data")
.where("user=?", x._1)
.where("period=?",x._2)
.where("userid=?,x._3)
)
val combinedRdd = sc.union(rdds)
val result = combinedRdd.map(getDataFromColumns)
.coalesce(4)
.reduceByKey((x,y) => x+y)
.collect()
result.foreach(prinltn)
为什么解决方案二不比解决方案一快?
我的理解是,由于所有分区都是一次性查询的,并且数据是跨节点分布的,因此应该更快。 如有不妥请指正
首先,您应该查看 joinWithCassandraTable,这对于您正在做的事情来说应该更容易 api(前提是您有足够的分区使其值得)。这个 api 采用分区键的 RDD,并从 C* 中定性和分发它们的检索。
这会让你做类似的事情
sc.parallelize(partitions).joinWithCassandraTable("keyspace","table")
如果您愿意,也可以执行 repartitionByCassandraReplica
但这很可能对非常小的请求没有好处。您必须对数据进行基准测试才能确定。
如果你只想执行原始驱动程序命令,你可以执行类似
的操作val cc = CassandraConnector(sc.getConf)
partitions.mapPartitions{ it =>
cc.withSessionDo{ session =>
session.execute( Some query )
}
}
代码示例演练:
现在让我们快速浏览一下您的代码示例 首先,我们只检索 60 个 C* 分区。对于此用例,与从 C* 检索分区所需的时间相比,我们很可能会考虑设置和取消任务所需的时间。
在这两种解决方案中,由于 Spark 的惰性求值,您基本上都在做同样的事情。驱动程序创建一个图形,首先创建 60 个 RDD,每个 RDD 使用惰性指令从 C* 检索单个分区。 (每个分区 1 个 RDD 是不好的,RDD 旨在存储大量数据,因此最终会产生大量开销)。尽管 60 个 RDD 是用不同的模式制作的,但这并不重要,因为它们的实际计算不会发生,直到您调用 collect。驱动程序继续设置新的 RDD 和转换。
在我们点击收集之前,绝对没有任何事情可以从 C* 中检索数据,因为我们点击收集时,对于您在上面发布的两种解决方案,使用基本上相同的依赖关系图,所以在这两种情况下都会发生完全(或非常相似)的事情。所有 60 个 RDD 都将按照依赖图指定的方式解析。这将并行发生,但同样会占用大量开销。
为了避免这种情况,请查看我上面使用单个 RDD 提取所有信息的示例。