scanAll 操作与执行的性能
Performance on scanAll operation vs execute
我在我的 aerospike
恢复过程中使用了以下 truncate
实现,这让我非常清楚在操作期间受影响的记录数量:
def truncate(startTime: Long, durableDelete: Boolean): Iterable[Int] = {
// Setting LUT
val calendar = Calendar.getInstance()
logger.info(s"truncate(records s.t LUT <= $startTime = ${calendar.getTime}, durableDelete = $durableDelete) on ${config.toRecoverMap}")
// Define Scan and Write Policies
val writePolicy = new WritePolicy()
val scanPolicy = new ScanPolicy()
writePolicy.durableDelete = durableDelete
scanPolicy.filterExp = Exp.build(Exp.le(Exp.lastUpdate(), Exp.`val`(calendar)))
// Scan all records such as LUT <= startTime
config.toRecoverMap.flatMap { case (namespace, mapOfSetsToBins) =>
for ((set, bins) <- mapOfSetsToBins) yield {
val recordCount = new AtomicInteger(0)
client.scanAll(scanPolicy, namespace, set, new ScanCallback() {
override def scanCallback(key: Key, record: Record): Unit = {
val requiresNullify = bins.filter(record.bins.containsKey(_)).toSeq // Instead of making bulk requests which maybe not be needed and load AS
if (requiresNullify.nonEmpty) {
recordCount.incrementAndGet()
client.put(writePolicy, key, requiresNullify.map(Bin.asNull): _*)
logger.debug {
val (nullified, remains) = record.bins.asScala.partition { case (key, _) => requiresNullify.contains(key) }
s"(#$recordCount): Record $nullified bins of record with userKey: ${key.userKey}, digest: ${Buffer.bytesToHexString(key.digest)} nullified, remains: $remains"
}
}
}
})
问题是由于回调,操作花费了很多时间,并且在 production
环境中没有受到影响,我将实现更改为以下而不是花费大约 2
小时,时间缩短为 10
分钟。
def truncate(startTime: Long, durableDelete: Boolean): Unit = {
// Setting LUT
val calendar = Calendar.getInstance()
logger.info(s"truncate(records s.t LUT <= $startTime = ${calendar.getTime}, durableDelete = $durableDelete) on ${config.toRecoverMap}")
// Define Write Policy
val writePolicy = new WritePolicy()
writePolicy.durableDelete = durableDelete
config.toRecoverMap.flatMap { case (namespace, mapOfSetsToBins) =>
for ((set, bins) <- mapOfSetsToBins) yield {
// Filter all elements s.t lastUpdate <= startTime on $set
writePolicy.filterExp = Exp.build(
Exp.and(
Exp.le(Exp.lastUpdate(), Exp.`val`(calendar)),
Exp.eq(Exp.setName(), Exp.`val`(set)))
)
val statement = new Statement
statement.setNamespace(namespace)
val toNullify = bins.map(Bin.asNull).map(Operation.put).toList
client.execute(writePolicy, statement, toNullify: _*).waitTillComplete(10.seconds.toMillis.toInt, 1.hour.toMillis.toInt)
}
}
}
但问题是我无法像第一种方法那样了解受影响的记录(查看 logger.debug
)
是否有解决方案如何运行 具有良好的性能并提供日志?
谢谢!
看起来您使用的是企业版,您的截断只考虑了 LUT。首选方法是使用 truncate
API。这比扫描和持久删除方法有一个显着的优势,因为它不需要为每个删除的键保留一个逻辑删除条目,相反,它将有一个条目将集合中的所有记录标记为已删除。它也不需要调用“古墓丽影”,这是一种定期磁盘扫描,用于搜索不再标记设备上的死记录的墓碑(又名“纪念碑”)。通过 truncate 可以找到每个节点删除的记录数 truncated_records.
您可以使用 truncate 信息命令调用此截断方法。
顺便说一句,您可以通过在扫描策略中将 includeBinData
选项设置为 false
来显着加快第一种方法的速度。这导致 Aerospike 在扫描期间只需要读取和发送内存中的元数据。我相信如果您已将记录的密钥与记录一起存储,我们仍然需要读取设备。
我在我的 aerospike
恢复过程中使用了以下 truncate
实现,这让我非常清楚在操作期间受影响的记录数量:
def truncate(startTime: Long, durableDelete: Boolean): Iterable[Int] = {
// Setting LUT
val calendar = Calendar.getInstance()
logger.info(s"truncate(records s.t LUT <= $startTime = ${calendar.getTime}, durableDelete = $durableDelete) on ${config.toRecoverMap}")
// Define Scan and Write Policies
val writePolicy = new WritePolicy()
val scanPolicy = new ScanPolicy()
writePolicy.durableDelete = durableDelete
scanPolicy.filterExp = Exp.build(Exp.le(Exp.lastUpdate(), Exp.`val`(calendar)))
// Scan all records such as LUT <= startTime
config.toRecoverMap.flatMap { case (namespace, mapOfSetsToBins) =>
for ((set, bins) <- mapOfSetsToBins) yield {
val recordCount = new AtomicInteger(0)
client.scanAll(scanPolicy, namespace, set, new ScanCallback() {
override def scanCallback(key: Key, record: Record): Unit = {
val requiresNullify = bins.filter(record.bins.containsKey(_)).toSeq // Instead of making bulk requests which maybe not be needed and load AS
if (requiresNullify.nonEmpty) {
recordCount.incrementAndGet()
client.put(writePolicy, key, requiresNullify.map(Bin.asNull): _*)
logger.debug {
val (nullified, remains) = record.bins.asScala.partition { case (key, _) => requiresNullify.contains(key) }
s"(#$recordCount): Record $nullified bins of record with userKey: ${key.userKey}, digest: ${Buffer.bytesToHexString(key.digest)} nullified, remains: $remains"
}
}
}
})
问题是由于回调,操作花费了很多时间,并且在 production
环境中没有受到影响,我将实现更改为以下而不是花费大约 2
小时,时间缩短为 10
分钟。
def truncate(startTime: Long, durableDelete: Boolean): Unit = {
// Setting LUT
val calendar = Calendar.getInstance()
logger.info(s"truncate(records s.t LUT <= $startTime = ${calendar.getTime}, durableDelete = $durableDelete) on ${config.toRecoverMap}")
// Define Write Policy
val writePolicy = new WritePolicy()
writePolicy.durableDelete = durableDelete
config.toRecoverMap.flatMap { case (namespace, mapOfSetsToBins) =>
for ((set, bins) <- mapOfSetsToBins) yield {
// Filter all elements s.t lastUpdate <= startTime on $set
writePolicy.filterExp = Exp.build(
Exp.and(
Exp.le(Exp.lastUpdate(), Exp.`val`(calendar)),
Exp.eq(Exp.setName(), Exp.`val`(set)))
)
val statement = new Statement
statement.setNamespace(namespace)
val toNullify = bins.map(Bin.asNull).map(Operation.put).toList
client.execute(writePolicy, statement, toNullify: _*).waitTillComplete(10.seconds.toMillis.toInt, 1.hour.toMillis.toInt)
}
}
}
但问题是我无法像第一种方法那样了解受影响的记录(查看 logger.debug
)
是否有解决方案如何运行 具有良好的性能并提供日志?
谢谢!
看起来您使用的是企业版,您的截断只考虑了 LUT。首选方法是使用 truncate
API。这比扫描和持久删除方法有一个显着的优势,因为它不需要为每个删除的键保留一个逻辑删除条目,相反,它将有一个条目将集合中的所有记录标记为已删除。它也不需要调用“古墓丽影”,这是一种定期磁盘扫描,用于搜索不再标记设备上的死记录的墓碑(又名“纪念碑”)。通过 truncate 可以找到每个节点删除的记录数 truncated_records.
您可以使用 truncate 信息命令调用此截断方法。
顺便说一句,您可以通过在扫描策略中将 includeBinData
选项设置为 false
来显着加快第一种方法的速度。这导致 Aerospike 在扫描期间只需要读取和发送内存中的元数据。我相信如果您已将记录的密钥与记录一起存储,我们仍然需要读取设备。