cassandra spark 连接器读取性能

cassandra spark connector read performance

我有一些 Spark 经验,但刚开始使用 Cassandra。我正在尝试做一个非常简单的阅读并获得非常糟糕的性能 - 无法说出原因。这是我正在使用的代码:

sc.cassandraTable("nt_live_october","nt")
  .where("group_id='254358'")
  .where("epoch >=1443916800 and epoch<=1444348800")
  .first

所有 3 个参数都是 table 上密钥的一部分:

PRIMARY KEY (group_id, epoch, group_name, auto_generated_uuid_field) ) WITH CLUSTERING ORDER BY (epoch ASC, group_name ASC, auto_generated_uuid_field ASC)

我从我的驱动程序中看到的输出是这样的:

15/10/07 15:05:02 INFO CassandraConnector: Connected to Cassandra cluster: shakassandra 15/10/07 15:07:02 ERROR Session: Error creating pool to attila./198.xxx:9042 com.datastax.driver.core.ConnectionException: [attila./198.xxx:9042] Unexpected error during transport initialization (com.datastax.driver.core.OperationTimedOutException: [attila /198.xxx:9042] Operation timed out)

15/10/07 15:07:02 INFO SparkContext: Starting job: take at CassandraRDD.scala:121

15/10/07 15:07:03 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on osd09:39903 (size: 4.8 KB, free: 265.4 MB)

15/10/07 15:08:23 INFO TaskSetManager: Finished task 0.0 in stage 6.0 (TID 8) in 80153 ms on osd09 (1/1)

15/10/07 15:08:23 INFO TaskSetManager: Finished task 0.0 in stage 6.0 (TID 8) in 80153 ms on osd09 (1/1)

15/10/07 15:08:23 INFO DAGScheduler: ResultStage 6 (take at CassandraRDD.scala:121) finished in 80.958 s 15/10/07 15:08:23 INFO TaskSchedulerImpl: Removed TaskSet 6.0, whose tasks have all completed, from pool

15/10/07 15:08:23 INFO DAGScheduler: Job 5 finished: take at CassandraRDD.scala:121, took 81.043413 s

我希望这个查询非常快,但它需要一分钟多的时间。有几件事让我印象深刻

  1. 获取会话错误需要将近两分钟——我将 3 个节点的 IP 传递给 Spark Cassandra 连接器——有没有办法告诉它更快地跳过失败的连接?
  2. 任务被发送到不是 Cassandra 节点的 Spark worker -- 这对我来说似乎很奇怪 -- 有没有办法获取关于为什么调度程序选择将任务发送到远程节点的信息?
  3. 即使任务被发送到远程节点,该工作程序上的输入大小(最大)显示为 334.0 B / 1,但执行程序时间为 1.3 分钟(见图)。这看起来真的很慢——我希望时间花在反序列化上,而不是计算上……

非常感谢有关如何调试此问题的任何提示,以及在哪里寻找潜在问题。使用带有连接器 1.4.0-M3 的 Spark 1.4.1,cassandra ReleaseVersion:2.1.9,可调连接器参数的所有默认值

我认为问题在于分区之间的数据分布。您的 table 有一个集群(分区)键 - groupId,epoch 只是一个集群列。数据仅通过 groupId 分布在集群节点上,因此您在集群的一个节点上有一个 groupId='254358' 的巨大分区。 当您 运行 您的查询 Cassandra 到达 groupId='254358' 的非常快的分区,然后过滤所有行以查找纪元在 1443916800 和 1444348800 之间的记录。如果有很多行,查询将非常慢。实际上这个查询不是分布式的,它总是 运行 在一个节点上。

更好的做法是提取日期甚至小时并将其添加为分区键,在您的情况下类似于

PRIMARY KEY ((group_id, date), epoch, group_name, auto_generated_uuid_field) 
WITH CLUSTERING ORDER BY (epoch ASC, group_name ASC, auto_generated_uuid_field ASC)

要验证我的假设,您可以 运行 您在 cqlsh 中打开跟踪的当前查询阅读 here 操作方法。所以问题与Spark无关。

关于错误和获取它的时间,一切都很好,因为您在超时发生后收到错误。

我还记得 spark-cassandra-connector 的建议,将 Spark 从节点准确地连接到 Cassandra 节点,以通过分区键分发查询。