Aerospike java scanAll 客户端失败
Aerospike java client failure on scanAll
我使用以下方法 truncate
来自 aerospike namespace.set.bins
的数据:
// Setting LUT
val calendar = Calendar.getInstance()
calendar.setTimeInMillis(startTime + 1262304000000L) // uses CITRUSLEAF_EPOCH - see https://discuss.aerospike.com/t/how-to-use-view-and-calulate-last-update-time-lut-for-the-truncate-command/4330
logger.info(s"truncate($startTime = ${calendar.getTime}, durableDelete = $durableDelete) on ${config.toRecoverMap}")
// Define Scan and Write Policies
val writePolicy = new WritePolicy()
val scanPolicy = new ScanPolicy()
writePolicy.durableDelete = durableDelete
scanPolicy.filterExp = Exp.build(Exp.le(Exp.lastUpdate(), Exp.`val`(calendar)))
// Scan all records such as LUT <= startTime
config.toRecoverMap.flatMap { case (namespace, mapOfSetsToBins) =>
for ((set, bins) <- mapOfSetsToBins) yield {
val recordCount = new AtomicInteger(0)
client.scanAll(scanPolicy, namespace, set, new ScanCallback() {
override def scanCallback(key: Key, record: Record): Unit = {
val requiresNullify = bins.filter(record.bins.containsKey(_)).distinct // Instead of making bulk requests which maybe not be needed and load AS
if (requiresNullify.nonEmpty) {
client.put(writePolicy, key, requiresNullify.map(Bin.asNull): _*)
logger.debug(s"${recordCount.incrementAndGet()}: (${requiresNullify.mkString(",")}) Bins of Record: $record with $key are set to NULL")
}
}
})
logger.info(s"Totally $recordCount records affected during the truncate operation on $namespace.$set.$bins")
recordCount.get
}
}
}
失败于:
...
2021-08-08 16:51:30,551 [Aerospike-6] DEBUG c.d.a.c.r.services.AerospikeService.scanCallback(55) - 33950: (IsActive) Bins of Record: (gen:3),(exp:0),(bins:(IsActive:0)) with test-recovery-set-multi-1:null:95001b26e70dbb35e1487802ebbc857eceb92246 are set to NULL
原因:
Error -11,6,0,30000,0,5: Max retries exceeded: 5
com.aerospike.client.AerospikeException: Error -11,6,0,30000,0,5: Max retries exceeded: 5
at com.aerospike.client.query.PartitionTracker.isComplete(PartitionTracker.java:282)
at com.aerospike.client.command.ScanExecutor.scanPartitions(ScanExecutor.java:70)
at com.aerospike.client.AerospikeClient.scanAll(AerospikeClient.java:1519)
at com.aerospike.connect.reloader.services.AerospikeService.$anonfun$truncate(AerospikeService.scala:50)
at com.aerospike.connect.reloader.services.AerospikeService.$anonfun$truncate$adapted(AerospikeService.scala:48)
at scala.collection.Iterator$$anon.next(Iterator.scala:575)
at scala.collection.immutable.List.prependedAll(List.scala:153)
at scala.collection.immutable.List$.from(List.scala:651)
at scala.collection.immutable.List$.from(List.scala:648)
at scala.collection.IterableFactory$Delegate.from(Factory.scala:288)
at scala.collection.immutable.Iterable$.from(Iterable.scala:35)
at scala.collection.immutable.Iterable$.from(Iterable.scala:32)
at scala.collection.IterableOps$WithFilter.map(Iterable.scala:884)
at com.aerospike.connect.reloader.services.AerospikeService.$anonfun$truncate(AerospikeService.scala:48)
at scala.collection.StrictOptimizedIterableOps.flatMap(StrictOptimizedIterableOps.scala:117)
at scala.collection.StrictOptimizedIterableOps.flatMap$(StrictOptimizedIterableOps.scala:104)
at scala.collection.immutable.Map$Map1.flatMap(Map.scala:241)
at com.aerospike.connect.reloader.services.AerospikeService.truncate(AerospikeService.scala:47)
at com.aerospike.connect.reloader.tests.services.AerospikeServiceSpec.$anonfun$new(AerospikeServiceSpec.scala:23)
at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.wordspec.AnyWordSpecLike$$anon.apply(AnyWordSpecLike.scala:1077)
at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
at com.aerospike.connect.reloader.tests.services.AerospikeServiceSpec.withFixture(AerospikeServiceSpec.scala:13)
at org.scalatest.wordspec.AnyWordSpecLike.invokeWithFixture(AnyWordSpecLike.scala:1075)
at org.scalatest.wordspec.AnyWordSpecLike.$anonfun$runTest(AnyWordSpecLike.scala:1087)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
at org.scalatest.wordspec.AnyWordSpecLike.runTest(AnyWordSpecLike.scala:1087)
at org.scalatest.wordspec.AnyWordSpecLike.runTest$(AnyWordSpecLike.scala:1069)
at com.aerospike.connect.reloader.tests.services.AerospikeServiceSpec.runTest(AerospikeServiceSpec.scala:13)
at org.scalatest.wordspec.AnyWordSpecLike.$anonfun$runTests(AnyWordSpecLike.scala:1146)
at org.scalatest.SuperEngine.$anonfun$runTestsInBranch(Engine.scala:413)
at scala.collection.immutable.List.foreach(List.scala:333)
at org.scalatest.SuperEngine.traverseSubNodes(Engine.scala:401)
at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:390)
at org.scalatest.SuperEngine.$anonfun$runTestsInBranch(Engine.scala:427)
at scala.collection.immutable.List.foreach(List.scala:333)
at org.scalatest.SuperEngine.traverseSubNodes(Engine.scala:401)
at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
at org.scalatest.wordspec.AnyWordSpecLike.runTests(AnyWordSpecLike.scala:1146)
at org.scalatest.wordspec.AnyWordSpecLike.runTests$(AnyWordSpecLike.scala:1145)
at com.aerospike.connect.reloader.tests.services.AerospikeServiceSpec.runTests(AerospikeServiceSpec.scala:13)
at org.scalatest.Suite.run(Suite.scala:1112)
at org.scalatest.Suite.run$(Suite.scala:1094)
at com.aerospike.connect.reloader.tests.services.AerospikeServiceSpec.org$scalatest$BeforeAndAfterAll$$super$run(AerospikeServiceSpec.scala:13)
at org.scalatest.BeforeAndAfterAll.liftedTree1(BeforeAndAfterAll.scala:213)
at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
at com.aerospike.connect.reloader.tests.services.AerospikeServiceSpec.org$scalatest$wordspec$AnyWordSpecLike$$super$run(AerospikeServiceSpec.scala:13)
at org.scalatest.wordspec.AnyWordSpecLike.$anonfun$run(AnyWordSpecLike.scala:1191)
at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
at org.scalatest.wordspec.AnyWordSpecLike.run(AnyWordSpecLike.scala:1191)
at org.scalatest.wordspec.AnyWordSpecLike.run$(AnyWordSpecLike.scala:1189)
at com.aerospike.connect.reloader.tests.services.AerospikeServiceSpec.run(AerospikeServiceSpec.scala:13)
at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun(Runner.scala:1320)
at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$adapted(Runner.scala:1314)
at scala.collection.immutable.List.foreach(List.scala:333)
at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1314)
at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter(Runner.scala:993)
at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$adapted(Runner.scala:971)
at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1480)
at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:971)
at org.scalatest.tools.Runner$.run(Runner.scala:798)
at org.scalatest.tools.Runner.run(Runner.scala)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:38)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:25)
知道为什么会这样吗?
LUT 方法:
def calculateCurrentLUT(): Long = {
logger.info("calculateCurrentLUTs() Triggered")
val policy = new WritePolicy()
policy.setTimeout(config.operationTimeoutInMillis)
val key = new Key(config.toRecover.head.namespace, AerospikeConfiguration.dummySetName, AerospikeConfiguration.dummyKey)
client.put(policy, key, new Bin(AerospikeConfiguration.dummyBin, "Used by the Recovery process to calculate current machine startTime"))
client.execute(policy, key, AerospikeConfiguration.packageName, "getLUT").asInstanceOf[Long]
}
与:
def registerUDFs(): RegisterTask = {
logger.info(s"registerUDFs() Triggered")
val policy = new WritePolicy()
policy.setTimeout(config.operationTimeoutInMillis)
client.registerUdfString(policy, """
|function getLUT(r)
| return record.last_update_time(r)
|end
|""", AerospikeConfiguration.packageName + ".lua", Language.LUA)
}
AerospikeException: Error -11,6,0,30000,0,5: Max retries exceeded: 5
表示-11:错误代码,此操作的最大重试次数超过指定值。显示 6 次迭代(orig+maxretries)并且您指定最大重试次数为 5。您的连接设置为:0 - 对于 connectTimeout - 等待创建初始套接字,0 是默认值,30000 或 30s 是关闭空闲套接字的时间,0 是此扫描的总超时是操作 - 0 表示不超时,这对扫描来说是正确的,5 是你重试的次数 - 看起来服务器没有在 30 秒内响应客户端扫描调用并且客户端关闭空闲套接字并重试并5 次重试后抛出异常。明显有问题 - 检查服务器日志以获取更多线索。例如您是否使用支持扫描表达式的正确服务器版本?其次,我会检查您对 LUT 比较表达式的计算。如果过滤器表达式的计算结果为 false,扫描将在完成时 return 一个 EOF,没有匹配的记录 - 但如果在此之前套接字超时,扫描将进入重试。
我使用以下方法 truncate
来自 aerospike namespace.set.bins
的数据:
// Setting LUT
val calendar = Calendar.getInstance()
calendar.setTimeInMillis(startTime + 1262304000000L) // uses CITRUSLEAF_EPOCH - see https://discuss.aerospike.com/t/how-to-use-view-and-calulate-last-update-time-lut-for-the-truncate-command/4330
logger.info(s"truncate($startTime = ${calendar.getTime}, durableDelete = $durableDelete) on ${config.toRecoverMap}")
// Define Scan and Write Policies
val writePolicy = new WritePolicy()
val scanPolicy = new ScanPolicy()
writePolicy.durableDelete = durableDelete
scanPolicy.filterExp = Exp.build(Exp.le(Exp.lastUpdate(), Exp.`val`(calendar)))
// Scan all records such as LUT <= startTime
config.toRecoverMap.flatMap { case (namespace, mapOfSetsToBins) =>
for ((set, bins) <- mapOfSetsToBins) yield {
val recordCount = new AtomicInteger(0)
client.scanAll(scanPolicy, namespace, set, new ScanCallback() {
override def scanCallback(key: Key, record: Record): Unit = {
val requiresNullify = bins.filter(record.bins.containsKey(_)).distinct // Instead of making bulk requests which maybe not be needed and load AS
if (requiresNullify.nonEmpty) {
client.put(writePolicy, key, requiresNullify.map(Bin.asNull): _*)
logger.debug(s"${recordCount.incrementAndGet()}: (${requiresNullify.mkString(",")}) Bins of Record: $record with $key are set to NULL")
}
}
})
logger.info(s"Totally $recordCount records affected during the truncate operation on $namespace.$set.$bins")
recordCount.get
}
}
}
失败于:
...
2021-08-08 16:51:30,551 [Aerospike-6] DEBUG c.d.a.c.r.services.AerospikeService.scanCallback(55) - 33950: (IsActive) Bins of Record: (gen:3),(exp:0),(bins:(IsActive:0)) with test-recovery-set-multi-1:null:95001b26e70dbb35e1487802ebbc857eceb92246 are set to NULL
原因:
Error -11,6,0,30000,0,5: Max retries exceeded: 5
com.aerospike.client.AerospikeException: Error -11,6,0,30000,0,5: Max retries exceeded: 5
at com.aerospike.client.query.PartitionTracker.isComplete(PartitionTracker.java:282)
at com.aerospike.client.command.ScanExecutor.scanPartitions(ScanExecutor.java:70)
at com.aerospike.client.AerospikeClient.scanAll(AerospikeClient.java:1519)
at com.aerospike.connect.reloader.services.AerospikeService.$anonfun$truncate(AerospikeService.scala:50)
at com.aerospike.connect.reloader.services.AerospikeService.$anonfun$truncate$adapted(AerospikeService.scala:48)
at scala.collection.Iterator$$anon.next(Iterator.scala:575)
at scala.collection.immutable.List.prependedAll(List.scala:153)
at scala.collection.immutable.List$.from(List.scala:651)
at scala.collection.immutable.List$.from(List.scala:648)
at scala.collection.IterableFactory$Delegate.from(Factory.scala:288)
at scala.collection.immutable.Iterable$.from(Iterable.scala:35)
at scala.collection.immutable.Iterable$.from(Iterable.scala:32)
at scala.collection.IterableOps$WithFilter.map(Iterable.scala:884)
at com.aerospike.connect.reloader.services.AerospikeService.$anonfun$truncate(AerospikeService.scala:48)
at scala.collection.StrictOptimizedIterableOps.flatMap(StrictOptimizedIterableOps.scala:117)
at scala.collection.StrictOptimizedIterableOps.flatMap$(StrictOptimizedIterableOps.scala:104)
at scala.collection.immutable.Map$Map1.flatMap(Map.scala:241)
at com.aerospike.connect.reloader.services.AerospikeService.truncate(AerospikeService.scala:47)
at com.aerospike.connect.reloader.tests.services.AerospikeServiceSpec.$anonfun$new(AerospikeServiceSpec.scala:23)
at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.wordspec.AnyWordSpecLike$$anon.apply(AnyWordSpecLike.scala:1077)
at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
at com.aerospike.connect.reloader.tests.services.AerospikeServiceSpec.withFixture(AerospikeServiceSpec.scala:13)
at org.scalatest.wordspec.AnyWordSpecLike.invokeWithFixture(AnyWordSpecLike.scala:1075)
at org.scalatest.wordspec.AnyWordSpecLike.$anonfun$runTest(AnyWordSpecLike.scala:1087)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
at org.scalatest.wordspec.AnyWordSpecLike.runTest(AnyWordSpecLike.scala:1087)
at org.scalatest.wordspec.AnyWordSpecLike.runTest$(AnyWordSpecLike.scala:1069)
at com.aerospike.connect.reloader.tests.services.AerospikeServiceSpec.runTest(AerospikeServiceSpec.scala:13)
at org.scalatest.wordspec.AnyWordSpecLike.$anonfun$runTests(AnyWordSpecLike.scala:1146)
at org.scalatest.SuperEngine.$anonfun$runTestsInBranch(Engine.scala:413)
at scala.collection.immutable.List.foreach(List.scala:333)
at org.scalatest.SuperEngine.traverseSubNodes(Engine.scala:401)
at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:390)
at org.scalatest.SuperEngine.$anonfun$runTestsInBranch(Engine.scala:427)
at scala.collection.immutable.List.foreach(List.scala:333)
at org.scalatest.SuperEngine.traverseSubNodes(Engine.scala:401)
at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
at org.scalatest.wordspec.AnyWordSpecLike.runTests(AnyWordSpecLike.scala:1146)
at org.scalatest.wordspec.AnyWordSpecLike.runTests$(AnyWordSpecLike.scala:1145)
at com.aerospike.connect.reloader.tests.services.AerospikeServiceSpec.runTests(AerospikeServiceSpec.scala:13)
at org.scalatest.Suite.run(Suite.scala:1112)
at org.scalatest.Suite.run$(Suite.scala:1094)
at com.aerospike.connect.reloader.tests.services.AerospikeServiceSpec.org$scalatest$BeforeAndAfterAll$$super$run(AerospikeServiceSpec.scala:13)
at org.scalatest.BeforeAndAfterAll.liftedTree1(BeforeAndAfterAll.scala:213)
at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
at com.aerospike.connect.reloader.tests.services.AerospikeServiceSpec.org$scalatest$wordspec$AnyWordSpecLike$$super$run(AerospikeServiceSpec.scala:13)
at org.scalatest.wordspec.AnyWordSpecLike.$anonfun$run(AnyWordSpecLike.scala:1191)
at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
at org.scalatest.wordspec.AnyWordSpecLike.run(AnyWordSpecLike.scala:1191)
at org.scalatest.wordspec.AnyWordSpecLike.run$(AnyWordSpecLike.scala:1189)
at com.aerospike.connect.reloader.tests.services.AerospikeServiceSpec.run(AerospikeServiceSpec.scala:13)
at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun(Runner.scala:1320)
at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$adapted(Runner.scala:1314)
at scala.collection.immutable.List.foreach(List.scala:333)
at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1314)
at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter(Runner.scala:993)
at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$adapted(Runner.scala:971)
at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1480)
at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:971)
at org.scalatest.tools.Runner$.run(Runner.scala:798)
at org.scalatest.tools.Runner.run(Runner.scala)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:38)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:25)
知道为什么会这样吗?
LUT 方法:
def calculateCurrentLUT(): Long = {
logger.info("calculateCurrentLUTs() Triggered")
val policy = new WritePolicy()
policy.setTimeout(config.operationTimeoutInMillis)
val key = new Key(config.toRecover.head.namespace, AerospikeConfiguration.dummySetName, AerospikeConfiguration.dummyKey)
client.put(policy, key, new Bin(AerospikeConfiguration.dummyBin, "Used by the Recovery process to calculate current machine startTime"))
client.execute(policy, key, AerospikeConfiguration.packageName, "getLUT").asInstanceOf[Long]
}
与:
def registerUDFs(): RegisterTask = {
logger.info(s"registerUDFs() Triggered")
val policy = new WritePolicy()
policy.setTimeout(config.operationTimeoutInMillis)
client.registerUdfString(policy, """
|function getLUT(r)
| return record.last_update_time(r)
|end
|""", AerospikeConfiguration.packageName + ".lua", Language.LUA)
}
AerospikeException: Error -11,6,0,30000,0,5: Max retries exceeded: 5
表示-11:错误代码,此操作的最大重试次数超过指定值。显示 6 次迭代(orig+maxretries)并且您指定最大重试次数为 5。您的连接设置为:0 - 对于 connectTimeout - 等待创建初始套接字,0 是默认值,30000 或 30s 是关闭空闲套接字的时间,0 是此扫描的总超时是操作 - 0 表示不超时,这对扫描来说是正确的,5 是你重试的次数 - 看起来服务器没有在 30 秒内响应客户端扫描调用并且客户端关闭空闲套接字并重试并5 次重试后抛出异常。明显有问题 - 检查服务器日志以获取更多线索。例如您是否使用支持扫描表达式的正确服务器版本?其次,我会检查您对 LUT 比较表达式的计算。如果过滤器表达式的计算结果为 false,扫描将在完成时 return 一个 EOF,没有匹配的记录 - 但如果在此之前套接字超时,扫描将进入重试。