在 cassandra 上执行大量读取获取 BusyPoolException
executing a load of reads on cassandra getting BusyPoolException
我正在尝试使用 Phantom 版本 2.14.1 执行大量查询,例如:
case class Foo(id: String, x: Long, y: Long)
val list: List[Foo] = _
list.size = 100000
def find(id: String, x: Long, y:Long )
select
.where(_.id eqs id)
.and(_.ts >= x)
.and(_.ts < y)
.fetch()
}
list.map(f => find(f.id, f.x, f.y)
我遇到了这个异常:
[pool-2-thread-91] ERROR com.outworkers.phantom - Failed to execute query SELECT * FROM my_table WHERE id = 'some_uuid' AND x >= 1503501104 AND y < 1503501224;
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: localhost/0:0:0:0:0:0:0:1:9042 (com.datastax.driver.core.exceptions.BusyPoolException: [localhost/0:0:0:0:0:0:0:1] Pool is busy (no available connection and the queue has reached its max size 256)))
at com.datastax.driver.core.RequestHandler.reportNoMoreHosts(RequestHandler.java:220)
at com.datastax.driver.core.RequestHandler.access00(RequestHandler.java:50)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.findNextHostAndQuery(RequestHandler.java:291)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onFailure(RequestHandler.java:358)
at com.google.common.util.concurrent.Futures.run(Futures.java:1764)
at com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:456)
at com.google.common.util.concurrent.Futures$ImmediateFuture.addListener(Futures.java:153)
at com.google.common.util.concurrent.Futures.addCallback(Futures.java:1776)
at com.google.common.util.concurrent.Futures.addCallback(Futures.java:1713)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.query(RequestHandler.java:313)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.findNextHostAndQuery(RequestHandler.java:283)
at com.datastax.driver.core.RequestHandler.startNewExecution(RequestHandler.java:118)
at com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:98)
at com.datastax.driver.core.SessionManager.executeAsync(SessionManager.java:132)
at com.outworkers.phantom.ScalaGuavaAdapter$.statementToPromise(ScalaGuavaAdapter.scala:70)
at com.outworkers.phantom.ScalaGuavaAdapter$.statementToFuture(ScalaGuavaAdapter.scala:32)
at com.outworkers.phantom.ScalaGuavaAdapter$.fromGuava(ScalaGuavaAdapter.scala:90)
at com.outworkers.phantom.ScalaGuavaAdapter$.fromGuava(ScalaGuavaAdapter.scala:26)
at com.outworkers.phantom.builder.query.execution.GuavaAdapter$class.fromGuava(ExecutableStatements.scala:44)
at com.outworkers.phantom.ScalaGuavaAdapter$.fromGuava(ScalaGuavaAdapter.scala:26)
at com.outworkers.phantom.builder.query.execution.QueryInterface.future(QueryInterface.scala:71)
at com.outworkers.phantom.builder.query.execution.ResultQueryInterface.fetch(ResultQueryInterface.scala:131)
我尝试按照建议配置联系点,但我在配置中输入的任何数字似乎都没有影响
val connector = ContactPoint.LOCAL.withClusterBuilder( ).withoutJMXReporting().withoutMetrics().withPoolingOptions(
new PoolingOptions()
.setMaxConnectionsPerHost(HostDistance.LOCAL, 1)
.setMaxConnectionsPerHost(HostDistance.REMOTE, 2)
.setMaxRequestsPerConnection(HostDistance.LOCAL,100)
.setMaxRequestsPerConnection(HostDistance.REMOTE,200)
)
).keySpace(KeySpaceSerializer(keyspace).ifNotExists().`with`(replication eqs SimpleStrategy.replication_factor(1))
.and(durable_writes eqs true))
我认为您在这里看到的内容与流媒体本身无关,问题是大量请求正在独占 Datastax Java 驱动程序的内部线程池,所以基本上是驱动程序运行 没有与 Cassandra 的并发连接。
这在一定程度上是可调的。
PoolingOptions.setMaxRequestsPerConnection(HostDistance, int): maximum number of requests per connection;
PoolingOptions.setMaxConnectionsPerHost(HostDistance, int): maximum number of connections in the pool;
PoolingOptions.setMaxQueueSize(int): maximum number of enqueued requests before this exception is thrown.
您可以像这样通过 ClusterBuilder
设置它们。
val connector = ContactPoint.local
.noHeartbeat()
.withClusterBuilder(_.withoutJMXReporting()
.withoutMetrics().withPoolingOptions(
new PoolingOptions()
.setMaxConnectionsPerHost(HostDistance.LOCAL, 15))
.setMaxRequestsPerConnection(100)
).keySpace(KeySpaceSerializer(space).ifNotExists()
.`with`(replication eqs SimpleStrategy.replication_factor(1))
.and(durable_writes eqs true)
)
我并没有实际检查这些值是否全部正确,只是向您展示了 DSL 配置。
我正在尝试使用 Phantom 版本 2.14.1 执行大量查询,例如:
case class Foo(id: String, x: Long, y: Long)
val list: List[Foo] = _
list.size = 100000
def find(id: String, x: Long, y:Long )
select
.where(_.id eqs id)
.and(_.ts >= x)
.and(_.ts < y)
.fetch()
}
list.map(f => find(f.id, f.x, f.y)
我遇到了这个异常:
[pool-2-thread-91] ERROR com.outworkers.phantom - Failed to execute query SELECT * FROM my_table WHERE id = 'some_uuid' AND x >= 1503501104 AND y < 1503501224;
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: localhost/0:0:0:0:0:0:0:1:9042 (com.datastax.driver.core.exceptions.BusyPoolException: [localhost/0:0:0:0:0:0:0:1] Pool is busy (no available connection and the queue has reached its max size 256)))
at com.datastax.driver.core.RequestHandler.reportNoMoreHosts(RequestHandler.java:220)
at com.datastax.driver.core.RequestHandler.access00(RequestHandler.java:50)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.findNextHostAndQuery(RequestHandler.java:291)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onFailure(RequestHandler.java:358)
at com.google.common.util.concurrent.Futures.run(Futures.java:1764)
at com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:456)
at com.google.common.util.concurrent.Futures$ImmediateFuture.addListener(Futures.java:153)
at com.google.common.util.concurrent.Futures.addCallback(Futures.java:1776)
at com.google.common.util.concurrent.Futures.addCallback(Futures.java:1713)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.query(RequestHandler.java:313)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.findNextHostAndQuery(RequestHandler.java:283)
at com.datastax.driver.core.RequestHandler.startNewExecution(RequestHandler.java:118)
at com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:98)
at com.datastax.driver.core.SessionManager.executeAsync(SessionManager.java:132)
at com.outworkers.phantom.ScalaGuavaAdapter$.statementToPromise(ScalaGuavaAdapter.scala:70)
at com.outworkers.phantom.ScalaGuavaAdapter$.statementToFuture(ScalaGuavaAdapter.scala:32)
at com.outworkers.phantom.ScalaGuavaAdapter$.fromGuava(ScalaGuavaAdapter.scala:90)
at com.outworkers.phantom.ScalaGuavaAdapter$.fromGuava(ScalaGuavaAdapter.scala:26)
at com.outworkers.phantom.builder.query.execution.GuavaAdapter$class.fromGuava(ExecutableStatements.scala:44)
at com.outworkers.phantom.ScalaGuavaAdapter$.fromGuava(ScalaGuavaAdapter.scala:26)
at com.outworkers.phantom.builder.query.execution.QueryInterface.future(QueryInterface.scala:71)
at com.outworkers.phantom.builder.query.execution.ResultQueryInterface.fetch(ResultQueryInterface.scala:131)
我尝试按照建议配置联系点,但我在配置中输入的任何数字似乎都没有影响
val connector = ContactPoint.LOCAL.withClusterBuilder( ).withoutJMXReporting().withoutMetrics().withPoolingOptions(
new PoolingOptions()
.setMaxConnectionsPerHost(HostDistance.LOCAL, 1)
.setMaxConnectionsPerHost(HostDistance.REMOTE, 2)
.setMaxRequestsPerConnection(HostDistance.LOCAL,100)
.setMaxRequestsPerConnection(HostDistance.REMOTE,200)
)
).keySpace(KeySpaceSerializer(keyspace).ifNotExists().`with`(replication eqs SimpleStrategy.replication_factor(1))
.and(durable_writes eqs true))
我认为您在这里看到的内容与流媒体本身无关,问题是大量请求正在独占 Datastax Java 驱动程序的内部线程池,所以基本上是驱动程序运行 没有与 Cassandra 的并发连接。
这在一定程度上是可调的。
PoolingOptions.setMaxRequestsPerConnection(HostDistance, int): maximum number of requests per connection;
PoolingOptions.setMaxConnectionsPerHost(HostDistance, int): maximum number of connections in the pool;
PoolingOptions.setMaxQueueSize(int): maximum number of enqueued requests before this exception is thrown.
您可以像这样通过 ClusterBuilder
设置它们。
val connector = ContactPoint.local
.noHeartbeat()
.withClusterBuilder(_.withoutJMXReporting()
.withoutMetrics().withPoolingOptions(
new PoolingOptions()
.setMaxConnectionsPerHost(HostDistance.LOCAL, 15))
.setMaxRequestsPerConnection(100)
).keySpace(KeySpaceSerializer(space).ifNotExists()
.`with`(replication eqs SimpleStrategy.replication_factor(1))
.and(durable_writes eqs true)
)
我并没有实际检查这些值是否全部正确,只是向您展示了 DSL 配置。