使用 spark-cassandra 连接器的 Cassandra 插入性能
Cassandra insert performance using spark-cassandra connector
我是 spark 和 cassandra 的新手。我正在尝试使用 spark-cassandra 连接器插入 cassandra table,如下所示:
import java.util.UUID
import org.apache.spark.{SparkContext, SparkConf}
import org.joda.time.DateTime
import com.datastax.spark.connector._
case class TestEntity(id:UUID, category:String, name:String,value:Double, createDate:DateTime, tag:Long)
object SparkConnectorContext {
val conf = new SparkConf(true).setMaster("local")
.set("spark.cassandra.connection.host", "192.168.xxx.xxx")
val sc = new SparkContext(conf)
}
object TestRepo {
def insertList(list: List[TestEntity]) = {
SparkConnectorContext.sc.parallelize(list).saveToCassandra("testKeySpace", "testColumnFamily")
}
}
object TestApp extends App {
val start = System.currentTimeMillis()
TestRepo.insertList(Utility.generateRandomData())
val end = System.currentTimeMillis()
val timeDiff = end-start
println("Difference (in millis)= "+timeDiff)
}
当我使用上述方法插入时(包含 100 个实体的列表),需要 300-1100 milliseconds
。
我尝试使用 phantom 库插入相同的数据。它只需要不到 20-40 milliseconds
。
谁能告诉我为什么 spark 连接器插入要花这么多时间?我在我的代码中做错了什么,还是不建议使用 spark-cassandra connector 进行插入操作?
您似乎在计时中包含了并行化操作。此外,由于您的 spark worker 运行ning 在与 Cassandra 不同的机器上,因此 saveToCassandra 操作将通过网络写入。
尝试将您的系统配置为 运行 Cassandra 节点上的 spark worker。然后在单独的步骤中创建一个 RDD,并在其上调用 count() 之类的操作以将数据加载到内存中。此外,您可能希望对 RDD 进行 persist() 或 cache() 以确保它保留在内存中以进行测试。
然后只对缓存的 RDD 的 saveToCassandra 计时。
您可能还想查看 Cassandra 连接器提供的 repartitionByCassandraReplica 方法。这将根据写入需要转到的 Cassandra 节点对 RDD 中的数据进行分区。通过这种方式,您可以利用数据局部性并经常避免通过网络进行写入和随机播放。
您的 "benchmark" 存在一些严重问题:
- 您的数据集太小,您主要只测量作业设置时间。在单个节点上保存 100 个实体应该是几毫秒,而不是几秒。此外,保存 100 个实体让 JVM 没有机会编译您 运行 优化的机器代码。
- 您在测量中包括了 spark 上下文初始化。 JVM延迟加载classes,所以spark初始化的代码真正是在测量开始后调用的。这是一个非常昂贵的元素,通常每个整个 spark 应用程序只执行一次,甚至不是每个作业。
- 您每次启动只执行一次测量。这意味着您甚至错误地测量了 spark ctx 设置和作业设置时间,因为 JVM 必须在第一次加载所有 classes,而 Hotspot 可能没有机会启动。
总而言之,您很可能主要测量 class 加载时间,这取决于加载的 classes 的大小和数量。 Spark 的加载量很大,几百毫秒一点也不奇怪。
要正确测量插入性能:
- 使用更大的数据集
- 从测量中排除一次性设置
- 多次运行共享相同的 spark 上下文并丢弃一些初始的,直到达到稳定状态性能。
顺便说一句,如果启用调试日志记录级别,连接器会在执行程序日志中记录每个分区的插入时间。
我是 spark 和 cassandra 的新手。我正在尝试使用 spark-cassandra 连接器插入 cassandra table,如下所示:
import java.util.UUID
import org.apache.spark.{SparkContext, SparkConf}
import org.joda.time.DateTime
import com.datastax.spark.connector._
case class TestEntity(id:UUID, category:String, name:String,value:Double, createDate:DateTime, tag:Long)
object SparkConnectorContext {
val conf = new SparkConf(true).setMaster("local")
.set("spark.cassandra.connection.host", "192.168.xxx.xxx")
val sc = new SparkContext(conf)
}
object TestRepo {
def insertList(list: List[TestEntity]) = {
SparkConnectorContext.sc.parallelize(list).saveToCassandra("testKeySpace", "testColumnFamily")
}
}
object TestApp extends App {
val start = System.currentTimeMillis()
TestRepo.insertList(Utility.generateRandomData())
val end = System.currentTimeMillis()
val timeDiff = end-start
println("Difference (in millis)= "+timeDiff)
}
当我使用上述方法插入时(包含 100 个实体的列表),需要 300-1100 milliseconds
。
我尝试使用 phantom 库插入相同的数据。它只需要不到 20-40 milliseconds
。
谁能告诉我为什么 spark 连接器插入要花这么多时间?我在我的代码中做错了什么,还是不建议使用 spark-cassandra connector 进行插入操作?
您似乎在计时中包含了并行化操作。此外,由于您的 spark worker 运行ning 在与 Cassandra 不同的机器上,因此 saveToCassandra 操作将通过网络写入。
尝试将您的系统配置为 运行 Cassandra 节点上的 spark worker。然后在单独的步骤中创建一个 RDD,并在其上调用 count() 之类的操作以将数据加载到内存中。此外,您可能希望对 RDD 进行 persist() 或 cache() 以确保它保留在内存中以进行测试。
然后只对缓存的 RDD 的 saveToCassandra 计时。
您可能还想查看 Cassandra 连接器提供的 repartitionByCassandraReplica 方法。这将根据写入需要转到的 Cassandra 节点对 RDD 中的数据进行分区。通过这种方式,您可以利用数据局部性并经常避免通过网络进行写入和随机播放。
您的 "benchmark" 存在一些严重问题:
- 您的数据集太小,您主要只测量作业设置时间。在单个节点上保存 100 个实体应该是几毫秒,而不是几秒。此外,保存 100 个实体让 JVM 没有机会编译您 运行 优化的机器代码。
- 您在测量中包括了 spark 上下文初始化。 JVM延迟加载classes,所以spark初始化的代码真正是在测量开始后调用的。这是一个非常昂贵的元素,通常每个整个 spark 应用程序只执行一次,甚至不是每个作业。
- 您每次启动只执行一次测量。这意味着您甚至错误地测量了 spark ctx 设置和作业设置时间,因为 JVM 必须在第一次加载所有 classes,而 Hotspot 可能没有机会启动。
总而言之,您很可能主要测量 class 加载时间,这取决于加载的 classes 的大小和数量。 Spark 的加载量很大,几百毫秒一点也不奇怪。
要正确测量插入性能:
- 使用更大的数据集
- 从测量中排除一次性设置
- 多次运行共享相同的 spark 上下文并丢弃一些初始的,直到达到稳定状态性能。
顺便说一句,如果启用调试日志记录级别,连接器会在执行程序日志中记录每个分区的插入时间。