Cassandra:节点变得不可用,同时使用 Spark 摄取
Cassandra : node become unavailable, while ingesting with Spark
在使用 Spark 成功地将数据提取到 Cassandra 之后,
现在每次我尝试使用 Spark 摄取数据时(几分钟后或立即)都会返回错误:
Caused by: com.datastax.oss.driver.api.core.AllNodesFailedException: Could not reach any contact point, make sure you've provided valid addresses
我用简单的 CQLSH(不是 Spark)检查过,确实也返回了类似的错误(2 个节点,共 4 个节点):
Connection error: ('Unable to connect to any servers', {'1.2.3.4': error(111, "Tried connecting to [('1.2.3.4', 9042)]. Last error: Connection refused")})
所以基本上,当我使用 Spark 摄取到 Cassandra 时,一些节点会在某个时候出现故障。我必须重新启动节点,以便通过 cqlsh(和 spark)再次访问它。
奇怪的是,当我 运行 nodetool status
时,给定节点仍然写为“UP”,而 cqlsh
告诉 connection refused
该节点.
我尝试调查日志,但我遇到了一个大问题:日志中没有任何内容,服务器端没有触发任何异常。
我的情况该怎么办?为什么在这种情况下节点会出现故障或变得无响应?如何预防?
谢谢
!!!编辑!!!
要求的一些细节,如下:
Cassandra 基础设施:
- 网络:10 gbps
- 两个数据中心:
datacenter1
和 datacenter2
- 每个数据中心 4 个节点
- 每个数据中心 2 个副本:
CREATE KEYSPACE my_keyspace WITH replication = {'class': 'NetworkTopologyStrategy', 'datacenter1': '2', 'datacenter2': '2'} AND durable_writes = true;
- 用于输入和输出的一致性:
LOCAL_QUORUM
- 每个节点的总物理内存:128GB。
- 每个节点的内存重新分区:64GB 专用于每个 Cassandra 实例,64GB 专用于每个 Spark worker(位于每个 Cassandra 节点上)
- 存储:每个节点 4 TB NVME
Spark 应用程序配置:
- 总执行器内核:24 个内核(4 个实例 * 每个实例 6 个内核)
- 总执行器内存:48 GB(4 个实例 * 每个实例 8 GB)
- spark 上的 cassandra 配置:
spark.sql.catalog.cassandra.spark.cassandra.output.batch.size.rows 1
spark.sql.catalog.cassandra.spark.cassandra.output.concurrent.writes 100
spark.sql.catalog.cassandra.spark.cassandra.output.batch.grouping.key none
spark.sql.catalog.cassandra.spark.cassandra.output.throughputMBPerSec 80
spark.sql.catalog.cassandra.spark.cassandra.output.consistency.level LOCAL_QUORUM
spark.sql.catalog.cassandra.spark.cassandra.output.metrics false
spark.sql.catalog.cassandra.spark.cassandra.connection.timeoutMS 90000
spark.sql.catalog.cassandra.spark.cassandra.query.retry.count 10
spark.sql.catalog.cassandra com.datastax.spark.connector.datasource.CassandraCatalog
spark.sql.extensions com.datastax.spark.connector.CassandraSparkExtensions
(2 nodes of 4)
只是好奇,但密钥空间的复制因子 (RF) 是多少,写入操作使用的一致性级别是多少?
我会附和亚历克斯,并说通常会发生这种情况,因为 Spark 的写入速度快于 Cassandra 的处理速度。这给你留下了两个选择:
- 增加集群的大小以处理写入负载。
- 限制 Spark 作业的写入吞吐量。
值得一提的是:
2 replicas per datacenter
consistency used for input and output : LOCAL_QUORUM
因此,通过将写入一致性降低到 LOCAL_ONE
,您可能会获得更多吞吐量。
记住,quorum == RF / 2 + 1,也就是说 LOCAL_QUORUM
of 2 is 2.
所以我确实建议降低到 LOCAL_ONE
,因为现在 Spark 在 ALL
一致性下有效运行。
Which JMX indicators I need to care about ?
不记得它的确切名称,但如果您能找到磁盘 IOP 或吞吐量的指标,我想知道它是否达到了阈值并趋于平稳。
在使用 Spark 成功地将数据提取到 Cassandra 之后,
现在每次我尝试使用 Spark 摄取数据时(几分钟后或立即)都会返回错误:
Caused by: com.datastax.oss.driver.api.core.AllNodesFailedException: Could not reach any contact point, make sure you've provided valid addresses
我用简单的 CQLSH(不是 Spark)检查过,确实也返回了类似的错误(2 个节点,共 4 个节点):
Connection error: ('Unable to connect to any servers', {'1.2.3.4': error(111, "Tried connecting to [('1.2.3.4', 9042)]. Last error: Connection refused")})
所以基本上,当我使用 Spark 摄取到 Cassandra 时,一些节点会在某个时候出现故障。我必须重新启动节点,以便通过 cqlsh(和 spark)再次访问它。
奇怪的是,当我 运行 nodetool status
时,给定节点仍然写为“UP”,而 cqlsh
告诉 connection refused
该节点.
我尝试调查日志,但我遇到了一个大问题:日志中没有任何内容,服务器端没有触发任何异常。
我的情况该怎么办?为什么在这种情况下节点会出现故障或变得无响应?如何预防?
谢谢
!!!编辑!!!
要求的一些细节,如下:
Cassandra 基础设施:
- 网络:10 gbps
- 两个数据中心:
datacenter1
和datacenter2
- 每个数据中心 4 个节点
- 每个数据中心 2 个副本:
CREATE KEYSPACE my_keyspace WITH replication = {'class': 'NetworkTopologyStrategy', 'datacenter1': '2', 'datacenter2': '2'} AND durable_writes = true;
- 用于输入和输出的一致性:
LOCAL_QUORUM
- 每个节点的总物理内存:128GB。
- 每个节点的内存重新分区:64GB 专用于每个 Cassandra 实例,64GB 专用于每个 Spark worker(位于每个 Cassandra 节点上)
- 存储:每个节点 4 TB NVME
Spark 应用程序配置:
- 总执行器内核:24 个内核(4 个实例 * 每个实例 6 个内核)
- 总执行器内存:48 GB(4 个实例 * 每个实例 8 GB)
- spark 上的 cassandra 配置:
spark.sql.catalog.cassandra.spark.cassandra.output.batch.size.rows 1
spark.sql.catalog.cassandra.spark.cassandra.output.concurrent.writes 100
spark.sql.catalog.cassandra.spark.cassandra.output.batch.grouping.key none
spark.sql.catalog.cassandra.spark.cassandra.output.throughputMBPerSec 80
spark.sql.catalog.cassandra.spark.cassandra.output.consistency.level LOCAL_QUORUM
spark.sql.catalog.cassandra.spark.cassandra.output.metrics false
spark.sql.catalog.cassandra.spark.cassandra.connection.timeoutMS 90000
spark.sql.catalog.cassandra.spark.cassandra.query.retry.count 10
spark.sql.catalog.cassandra com.datastax.spark.connector.datasource.CassandraCatalog
spark.sql.extensions com.datastax.spark.connector.CassandraSparkExtensions
(2 nodes of 4)
只是好奇,但密钥空间的复制因子 (RF) 是多少,写入操作使用的一致性级别是多少?
我会附和亚历克斯,并说通常会发生这种情况,因为 Spark 的写入速度快于 Cassandra 的处理速度。这给你留下了两个选择:
- 增加集群的大小以处理写入负载。
- 限制 Spark 作业的写入吞吐量。
值得一提的是:
2 replicas per datacenter
consistency used for input and output : LOCAL_QUORUM
因此,通过将写入一致性降低到 LOCAL_ONE
,您可能会获得更多吞吐量。
记住,quorum == RF / 2 + 1,也就是说 LOCAL_QUORUM
of 2 is 2.
所以我确实建议降低到 LOCAL_ONE
,因为现在 Spark 在 ALL
一致性下有效运行。
Which JMX indicators I need to care about ?
不记得它的确切名称,但如果您能找到磁盘 IOP 或吞吐量的指标,我想知道它是否达到了阈值并趋于平稳。