datastax 会话因大量并行查询而挂起
datastax session hangs with high volume parallel queries
大图。
并行处理 2000 个查询时,datastax 会话挂起。
并行查询
我正在使用包装 Datastax Cassandra 驱动程序的 Alpakka。我正在使用 Scala Play 框架。
要对大数据进行行统计,必须要按分区进行。我使用以下代码计算每个分区的行数:
val futureList: ListBuffer[Future[Any]] = new ListBuffer[Future[Any]]
val acc: mutable.HashMap[String, Long] = new mutable.HashMap[String, Long]()
targets.isDefined match {
case true =>
targets.get.foreach {
e =>
val cq: CassandraQueries = new CassandraQueries()
Logger.info("targets collected so far: "+acc.size)
Logger.info("Calling count for "+e._1)
futureList += cq.futureQuery("SELECT count(*) FROM " + keyspaceName + ".\"sparseData\" where label = " + e._2 + ";", sparseRowCountResult(acc, e._1), 120000)
}
val results = Future.sequence(futureList.toList)
在我的一个键空间中,我有 2000 个分区,因此有 2000 个并行查询。
查询结果
查询由Alpakka/Datastax和returns一个Future[Seq[Row]].
处理
Logger.info("furtureQuery: session closed -> "+ session.isClosed)
val stmt = new SimpleStatement(query).setFetchSize(200).setReadTimeoutMillis(readTimeoutMillis)
val sb: StringBuilder = new StringBuilder()
val source = CassandraSource(stmt)
source.runWith(Sink.seq).onComplete {
case Success(f) => out(Some(f), None)
case Failure(e) =>
Logger.error("simpleQuery failed with " + e.getMessage)
out(None, Some(e.getMessage))
}
异常并挂起
大约 1000 次查询后,出现以下错误。在此之后,会话中没有任何内容 returns。 Success
和 Failure
都没有发生。
akka.ConfigurationException: Logger specified in config can't be
loaded [akka.event.Logging$DefaultLogger] due to
[akka.event.Logging$LoggerInitializationException: Logger
log1-Logging$DefaultLogger did not respond with LoggerInitialized,
sent instead [TIMEOUT]]
问题
我确定我可以延长日志记录的超时时间。但这是表象,不是真正的问题。
我该怎么做:
- 配置会话连接以允许 2000 个并行请求?
或
- 限制 Future.sequence 到已知的可能请求数量?
还有
- 如何以编程方式从此类 Sessiion 挂起中恢复?
您可以通过在创建集群实例时指定池选项来增加每个连接的运行中请求数,如下所示:
PoolingOptions poolingOptions = new PoolingOptions();
poolingOptions.setMaxRequestsPerConnection(HostDistance.LOCAL, 10240);
Cluster cluster = Cluster.builder()
.withContactPoints("127.0.0.1")
.withPoolingOptions(poolingOptions)
.build();
但是您仍然需要在代码中处理 BusyPoolException
,因为在使用异步请求时,仍然很容易使某个特定连接过载。
更多信息见 driver's documentation。
而是触发 2000 个查询来执行范围查询。利用集群对象元数据,获取令牌范围并计算密钥的令牌。然后,在一个范围查询中批量处理属于同一范围的查询。
大图。
并行处理 2000 个查询时,datastax 会话挂起。
并行查询
我正在使用包装 Datastax Cassandra 驱动程序的 Alpakka。我正在使用 Scala Play 框架。
要对大数据进行行统计,必须要按分区进行。我使用以下代码计算每个分区的行数:
val futureList: ListBuffer[Future[Any]] = new ListBuffer[Future[Any]]
val acc: mutable.HashMap[String, Long] = new mutable.HashMap[String, Long]()
targets.isDefined match {
case true =>
targets.get.foreach {
e =>
val cq: CassandraQueries = new CassandraQueries()
Logger.info("targets collected so far: "+acc.size)
Logger.info("Calling count for "+e._1)
futureList += cq.futureQuery("SELECT count(*) FROM " + keyspaceName + ".\"sparseData\" where label = " + e._2 + ";", sparseRowCountResult(acc, e._1), 120000)
}
val results = Future.sequence(futureList.toList)
在我的一个键空间中,我有 2000 个分区,因此有 2000 个并行查询。
查询结果
查询由Alpakka/Datastax和returns一个Future[Seq[Row]].
Logger.info("furtureQuery: session closed -> "+ session.isClosed)
val stmt = new SimpleStatement(query).setFetchSize(200).setReadTimeoutMillis(readTimeoutMillis)
val sb: StringBuilder = new StringBuilder()
val source = CassandraSource(stmt)
source.runWith(Sink.seq).onComplete {
case Success(f) => out(Some(f), None)
case Failure(e) =>
Logger.error("simpleQuery failed with " + e.getMessage)
out(None, Some(e.getMessage))
}
异常并挂起
大约 1000 次查询后,出现以下错误。在此之后,会话中没有任何内容 returns。 Success
和 Failure
都没有发生。
akka.ConfigurationException: Logger specified in config can't be loaded [akka.event.Logging$DefaultLogger] due to [akka.event.Logging$LoggerInitializationException: Logger log1-Logging$DefaultLogger did not respond with LoggerInitialized, sent instead [TIMEOUT]]
问题
我确定我可以延长日志记录的超时时间。但这是表象,不是真正的问题。
我该怎么做:
- 配置会话连接以允许 2000 个并行请求?
或
- 限制 Future.sequence 到已知的可能请求数量?
还有
- 如何以编程方式从此类 Sessiion 挂起中恢复?
您可以通过在创建集群实例时指定池选项来增加每个连接的运行中请求数,如下所示:
PoolingOptions poolingOptions = new PoolingOptions();
poolingOptions.setMaxRequestsPerConnection(HostDistance.LOCAL, 10240);
Cluster cluster = Cluster.builder()
.withContactPoints("127.0.0.1")
.withPoolingOptions(poolingOptions)
.build();
但是您仍然需要在代码中处理 BusyPoolException
,因为在使用异步请求时,仍然很容易使某个特定连接过载。
更多信息见 driver's documentation。
而是触发 2000 个查询来执行范围查询。利用集群对象元数据,获取令牌范围并计算密钥的令牌。然后,在一个范围查询中批量处理属于同一范围的查询。