scanAll 操作与执行的性能

Performance on scanAll operation vs execute

我在我的 aerospike 恢复过程中使用了以下 truncate 实现,这让我非常清楚在操作期间受影响的记录数量:

 def truncate(startTime: Long, durableDelete: Boolean): Iterable[Int] = {

    // Setting LUT
    val calendar = Calendar.getInstance()

    logger.info(s"truncate(records s.t LUT <= $startTime = ${calendar.getTime}, durableDelete = $durableDelete) on ${config.toRecoverMap}")

    // Define Scan and Write Policies
    val writePolicy = new WritePolicy()
    val scanPolicy = new ScanPolicy()
    writePolicy.durableDelete = durableDelete
    scanPolicy.filterExp = Exp.build(Exp.le(Exp.lastUpdate(), Exp.`val`(calendar)))

    // Scan all records such as LUT <= startTime
    config.toRecoverMap.flatMap { case (namespace, mapOfSetsToBins) =>
      for ((set, bins) <- mapOfSetsToBins) yield {
        val recordCount = new AtomicInteger(0)
        client.scanAll(scanPolicy, namespace, set, new ScanCallback() {
          override def scanCallback(key: Key, record: Record): Unit = {
            val requiresNullify = bins.filter(record.bins.containsKey(_)).toSeq // Instead of making bulk requests which maybe not be needed and load AS
            if (requiresNullify.nonEmpty) {
              recordCount.incrementAndGet()
              client.put(writePolicy, key, requiresNullify.map(Bin.asNull): _*)
              logger.debug {
                val (nullified, remains) = record.bins.asScala.partition { case (key, _) => requiresNullify.contains(key) }
                s"(#$recordCount): Record $nullified bins of record with userKey: ${key.userKey}, digest: ${Buffer.bytesToHexString(key.digest)} nullified, remains: $remains"
              }
            }
          }
        })

问题是由于回调,操作花费了很多时间,并且在 production 环境中没有受到影响,我将实现更改为以下而不是花费大约 2 小时,时间缩短为 10 分钟。

def truncate(startTime: Long, durableDelete: Boolean): Unit = {

    // Setting LUT
    val calendar = Calendar.getInstance()

    logger.info(s"truncate(records s.t LUT <= $startTime = ${calendar.getTime}, durableDelete = $durableDelete) on ${config.toRecoverMap}")

    // Define Write Policy
    val writePolicy = new WritePolicy()
    writePolicy.durableDelete = durableDelete

    config.toRecoverMap.flatMap { case (namespace, mapOfSetsToBins) =>
      for ((set, bins) <- mapOfSetsToBins) yield {

        // Filter all elements s.t lastUpdate <= startTime on $set
        writePolicy.filterExp = Exp.build(
          Exp.and(
            Exp.le(Exp.lastUpdate(), Exp.`val`(calendar)),
            Exp.eq(Exp.setName(), Exp.`val`(set)))
        )

        val statement = new Statement
        statement.setNamespace(namespace)
        val toNullify = bins.map(Bin.asNull).map(Operation.put).toList
        client.execute(writePolicy, statement, toNullify: _*).waitTillComplete(10.seconds.toMillis.toInt, 1.hour.toMillis.toInt)
      }
    }
  }

但问题是我无法像第一种方法那样了解受影响的记录(查看 logger.debug

是否有解决方案如何运行 具有良好的性能并提供日志?

谢谢!

看起来您使用的是企业版,您的截断只考虑了 LUT。首选方法是使用 truncate API。这比扫描和持久删除方法有一个显着的优势,因为它不需要为每个删除的键保留一个逻辑删除条目,相反,它将有一个条目将集合中的所有记录标记为已删除。它也不需要调用“古墓丽影”,这是一种定期磁盘扫描,用于搜索不再标记设备上的死记录的墓碑(又名“纪念碑”)。通过 truncate 可以找到每个节点删除的记录数 truncated_records.

您可以使用 truncate 信息命令调用此截断方法。


顺便说一句,您可以通过在扫描策略中将 includeBinData 选项设置为 false 来显着加快第一种方法的速度。这导致 Aerospike 在扫描期间只需要读取和发送内存中的元数据。我相信如果您已将记录的密钥与记录一起存储,我们仍然需要读取设备。