有没有办法找出Cassandra中SELECT语句使用了哪个节点?
Is there any way to find out which node has been used by SELECT statement in Cassandra?
我已经为 spark-cassandra-connector
编写了自定义 LoadBalancerPolicy,现在我想确保它真的有效!
我有一个 Cassandra 集群,有 3 个节点和一个复制因子为 2 的键空间,所以当我们想要检索记录时,cassandra 上只有两个节点保存数据。
问题是我想确保 spark-cassandra-connector
(使用我的负载均衡器策略)仍然是令牌感知的,并且会为每个 "SELECT" 语句选择正确的节点作为协调器。
现在,我想如果我们可以在每个节点的 SELECT 语句上写一个触发器,以防节点不保存数据,触发器将创建一个日志,我意识到负载平衡器策略无法正常工作。我们如何在 Cassandra 中写一个触发器 On SELECT ?有没有更好的方法来实现?
我已经查看了创建触发器的文档,但它们太有限了:
如果你get routing key for your bound statement (you must use prepared statements), find the replicas for it via Metadata class, and then compare if this host is in the ExecutionInfo你可以从ResultSet
.
得到,你可以从程序方面做到这一点
根据Alex的说法,我们可以这样操作:
创建SparkSession后,我们应该做一个连接器:
import com.datastax.spark.connector.cql.CassandraConnector
val connector = CassandraConnector.apply(sparkSession.sparkContext.getConf)
现在我们可以定义一个 preparedStatement 并完成剩下的工作:
connector.withSessionDo(session => {
val selectQuery = "select * from test where id=?"
val prepareStatement = session.prepare(selectQuery)
val protocolVersion = session.getCluster.getConfiguration.getProtocolOptions.getProtocolVersion
// We have to explicitly bind the all of parameters that partition key is based on them, otherwise the routingKey will be null.
val boundStatement = prepareStatement.bind(s"$id")
val routingKey = boundStatement.getRoutingKey(protocolVersion, null)
// We can get tha all of nodes that contains the row
val replicas = session.getCluster.getMetadata.getReplicas("test", routingKey)
val resultSet = session.execute(boundStatement)
// We can get the node which gave us the row
val host = resultSet.getExecutionInfo.getQueriedHost
// Final step is to check whether the replicas contains the host or not!!!
if (replicas.contains(host)) println("It works!")
})
重要的是我们必须显式绑定分区键基于它们的所有参数(即我们不能在 SELECT 语句中将它们设置为硬编码),否则 routingKey 将是空。
我已经为 spark-cassandra-connector
编写了自定义 LoadBalancerPolicy,现在我想确保它真的有效!
我有一个 Cassandra 集群,有 3 个节点和一个复制因子为 2 的键空间,所以当我们想要检索记录时,cassandra 上只有两个节点保存数据。
问题是我想确保 spark-cassandra-connector
(使用我的负载均衡器策略)仍然是令牌感知的,并且会为每个 "SELECT" 语句选择正确的节点作为协调器。
现在,我想如果我们可以在每个节点的 SELECT 语句上写一个触发器,以防节点不保存数据,触发器将创建一个日志,我意识到负载平衡器策略无法正常工作。我们如何在 Cassandra 中写一个触发器 On SELECT ?有没有更好的方法来实现?
我已经查看了创建触发器的文档,但它们太有限了:
如果你get routing key for your bound statement (you must use prepared statements), find the replicas for it via Metadata class, and then compare if this host is in the ExecutionInfo你可以从ResultSet
.
根据Alex的说法,我们可以这样操作:
创建SparkSession后,我们应该做一个连接器:
import com.datastax.spark.connector.cql.CassandraConnector
val connector = CassandraConnector.apply(sparkSession.sparkContext.getConf)
现在我们可以定义一个 preparedStatement 并完成剩下的工作:
connector.withSessionDo(session => {
val selectQuery = "select * from test where id=?"
val prepareStatement = session.prepare(selectQuery)
val protocolVersion = session.getCluster.getConfiguration.getProtocolOptions.getProtocolVersion
// We have to explicitly bind the all of parameters that partition key is based on them, otherwise the routingKey will be null.
val boundStatement = prepareStatement.bind(s"$id")
val routingKey = boundStatement.getRoutingKey(protocolVersion, null)
// We can get tha all of nodes that contains the row
val replicas = session.getCluster.getMetadata.getReplicas("test", routingKey)
val resultSet = session.execute(boundStatement)
// We can get the node which gave us the row
val host = resultSet.getExecutionInfo.getQueriedHost
// Final step is to check whether the replicas contains the host or not!!!
if (replicas.contains(host)) println("It works!")
})
重要的是我们必须显式绑定分区键基于它们的所有参数(即我们不能在 SELECT 语句中将它们设置为硬编码),否则 routingKey 将是空。