Cassandra Cluster 无法通过 Spark 看到节点
Cassandra Cluster can not see nodes through Spark
我正在尝试通过 Spark 进行写入。
我的集群中有 6 个节点,我在其中创建了要写入数据的键空间:
CREATE KEYSPACE traffic WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '2'} AND durable_writes = true;
当我尝试从 Spark 写入时,我遇到了这种错误:
16/08/17 16:14:57 ERROR QueryExecutor: Failed to execute: com.datastax.spark.connector.writer.RichBatchStatement@7409fd2d
com.datastax.driver.core.exceptions.UnavailableException: Not enough replicas available for query at consistency ONE (1 required but only 0 alive)
这是我在做什么的代码片段:
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.types.{StructType, StructField, DateType, IntegerType};
object ff {
def main(string: Array[String]) {
val conf = new SparkConf()
.set("spark.cassandra.connection.host", "127.0.0.1")
.set("spark.cassandra.connection.host","ONE")
.setMaster("local[4]")
.setAppName("ff")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true") // Use first line of all files as header
.option("inferSchema", "true")
.load("test.csv")
df.registerTempTable("ff_table")
//df.printSchema()
df.count
time {
df.write
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "ff_table", "keyspace" -> "traffic"))
.save()
}
def time[A](f: => A) = {
val s = System.nanoTime
val ret = f
println("time: " + (System.nanoTime - s) / 1e6 + "ms")
ret
}
}
}
另外,如果我 运行 nodetool describecluster
我得到了这个结果:
Cluster Information:
Name: Test Cluster
Snitch: org.apache.cassandra.locator.DynamicEndpointSnitch
Partitioner: org.apache.cassandra.dht.Murmur3Partitioner
Schema versions:
bf6c3ae7-5c8b-3e5d-9794-8e34bee9278f: [127.0.0.1, 127.0.0.2, 127.0.0.3, 127.0.0.4, 127.0.0.5, 127.0.0.6]
我尝试在 replication_factor
:2 的行上插入 CLI,它正在运行,因此每个节点都可以看到彼此。
为什么 Spark 不能插入任何东西,为什么节点在尝试从 Spark 插入数据时看不到彼此,有人知道吗?
看起来您正在通过环回在一台机器上 运行连接 6 个节点。这意味着这台机器的资源很有可能被过度订阅。各种 Cassandra 实例很可能轮流或交换,这导致它们在重负载时丢失。增加复制因子会增加有效目标启动的机会,但会进一步增加负载。
C* 需要来自您系统的几种不同资源的核心,如果其中任何一个成为瓶颈,则节点有可能无法在足够的时间内响应八卦。
这些资源是
RAM - JVM 能够获取多少内存,这也受 OS swap 的影响。这意味着如果您分配了一个大型 JVM 但 OS 将其交换到磁盘,您可能会遇到大量性能问题。对于同一台机器上的多个节点,您需要确保您启动的每个节点的 JVM 都有足够的内存。此外,如果任何一个实例的 JVM 接近满,您将进入 GC 并可能进入 GC Storm,这基本上会锁定该实例。 system.log.
中会清楚其中的许多详细信息
CPU - 如果没有对至少一个 cpu 的独占访问权,您几乎可以保证在 C* 中安排一些重要的线程,它们之间有很长的延迟。这可能会导致 gossip 线程被忽略并且 gossip 失败。这会给一些节点一个集群的视图,其中有故障机器并导致不可用错误。
DISK - 每个 Cassandra 实例都将维护它自己的 CommitLog 和 HD 文件。提交日志每 10 秒刷新一次,如果您有多个实例并且只有 1 个硬盘驱动器,则提交日志和普通内存表之间的刷新很容易相互阻塞。压缩进一步加剧了这种情况,这需要另外大量的 IO。
网络 - 虽然这不是同一台机器上的多个节点的问题。
总而言之,
重要的是要确保分配给您的 C* 实例的资源足够小,以至于没有实例会超过 运行 另一个实例的 space/ram/cpu。如果这样做,您最终会得到一个集群,其在负载下通信失败,因为上述资源之一出现瓶颈。这并不意味着不可能 运行 同一台机器上的多个节点,但确实意味着您必须小心配置。您还可以尝试通过限制写入速度来减轻负载,这将减少节点相互破坏的机会。
我正在尝试通过 Spark 进行写入。 我的集群中有 6 个节点,我在其中创建了要写入数据的键空间:
CREATE KEYSPACE traffic WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '2'} AND durable_writes = true;
当我尝试从 Spark 写入时,我遇到了这种错误:
16/08/17 16:14:57 ERROR QueryExecutor: Failed to execute: com.datastax.spark.connector.writer.RichBatchStatement@7409fd2d
com.datastax.driver.core.exceptions.UnavailableException: Not enough replicas available for query at consistency ONE (1 required but only 0 alive)
这是我在做什么的代码片段:
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.types.{StructType, StructField, DateType, IntegerType};
object ff {
def main(string: Array[String]) {
val conf = new SparkConf()
.set("spark.cassandra.connection.host", "127.0.0.1")
.set("spark.cassandra.connection.host","ONE")
.setMaster("local[4]")
.setAppName("ff")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true") // Use first line of all files as header
.option("inferSchema", "true")
.load("test.csv")
df.registerTempTable("ff_table")
//df.printSchema()
df.count
time {
df.write
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "ff_table", "keyspace" -> "traffic"))
.save()
}
def time[A](f: => A) = {
val s = System.nanoTime
val ret = f
println("time: " + (System.nanoTime - s) / 1e6 + "ms")
ret
}
}
}
另外,如果我 运行 nodetool describecluster
我得到了这个结果:
Cluster Information:
Name: Test Cluster
Snitch: org.apache.cassandra.locator.DynamicEndpointSnitch
Partitioner: org.apache.cassandra.dht.Murmur3Partitioner
Schema versions:
bf6c3ae7-5c8b-3e5d-9794-8e34bee9278f: [127.0.0.1, 127.0.0.2, 127.0.0.3, 127.0.0.4, 127.0.0.5, 127.0.0.6]
我尝试在 replication_factor
:2 的行上插入 CLI,它正在运行,因此每个节点都可以看到彼此。
为什么 Spark 不能插入任何东西,为什么节点在尝试从 Spark 插入数据时看不到彼此,有人知道吗?
看起来您正在通过环回在一台机器上 运行连接 6 个节点。这意味着这台机器的资源很有可能被过度订阅。各种 Cassandra 实例很可能轮流或交换,这导致它们在重负载时丢失。增加复制因子会增加有效目标启动的机会,但会进一步增加负载。
C* 需要来自您系统的几种不同资源的核心,如果其中任何一个成为瓶颈,则节点有可能无法在足够的时间内响应八卦。
这些资源是 RAM - JVM 能够获取多少内存,这也受 OS swap 的影响。这意味着如果您分配了一个大型 JVM 但 OS 将其交换到磁盘,您可能会遇到大量性能问题。对于同一台机器上的多个节点,您需要确保您启动的每个节点的 JVM 都有足够的内存。此外,如果任何一个实例的 JVM 接近满,您将进入 GC 并可能进入 GC Storm,这基本上会锁定该实例。 system.log.
中会清楚其中的许多详细信息CPU - 如果没有对至少一个 cpu 的独占访问权,您几乎可以保证在 C* 中安排一些重要的线程,它们之间有很长的延迟。这可能会导致 gossip 线程被忽略并且 gossip 失败。这会给一些节点一个集群的视图,其中有故障机器并导致不可用错误。
DISK - 每个 Cassandra 实例都将维护它自己的 CommitLog 和 HD 文件。提交日志每 10 秒刷新一次,如果您有多个实例并且只有 1 个硬盘驱动器,则提交日志和普通内存表之间的刷新很容易相互阻塞。压缩进一步加剧了这种情况,这需要另外大量的 IO。
网络 - 虽然这不是同一台机器上的多个节点的问题。
总而言之, 重要的是要确保分配给您的 C* 实例的资源足够小,以至于没有实例会超过 运行 另一个实例的 space/ram/cpu。如果这样做,您最终会得到一个集群,其在负载下通信失败,因为上述资源之一出现瓶颈。这并不意味着不可能 运行 同一台机器上的多个节点,但确实意味着您必须小心配置。您还可以尝试通过限制写入速度来减轻负载,这将减少节点相互破坏的机会。