当 Kotlin 通道变满时执行一段代码
Execute a piece of code when Kotlin channel becomes full
我想写一个简单的批处理器class。它有一个请求队列,等待这个队列变满或经过一些时间,然后才与数据库对话。
通过通道实现这个队列非常方便——这样我们的客户端就会在队列满时挂起。但是我怎么知道频道是否已满?
当然,我可以创建一个方法,将某些内容发送到通道,然后执行一些检查。下一步是将其封装在从 Channel 派生的 class 中。仍然很脏(我不清楚如何处理 onSend
/onReceive
)。有没有更优雅的解决方案?也许开箱即用?
通道接口声明了一个 isFull 属性,可以查询它以确定它是否已达到容量。
无论如何我看不到有回调函数来在达到容量时自动调用函数,但您可以定期检查此 isFull 属性 以查看它是否已达到容量。
这不是什么 out-of-the 框,但是可以使用 actor 轻松实现相应的批处理逻辑。您实际上并不需要 class(但如果您愿意,可以将此代码包装在 class 中)。您可以使用以下实现作为模板:
const val MAX_SIZE = 100 // max number of data items in batch
const val MAX_TIME = 1000 // max time (in ms) to wait
val batchActor = actor<Data> {
val batch = mutableListOf<Data>()
var deadline = 0L // deadline for sending this batch to DB
while (true) {
// when deadline is reached or size is exceeded, then force batch to DB
val remainingTime = deadline - System.currentTimeMillis()
if (batch.isNotEmpty() && remainingTime <= 0 || batch.size >= MAX_SIZE) {
saveToDB(batch)
batch.clear()
continue
}
// wait until items is received or timeout reached
select<Unit> {
// when received -> add to batch
channel.onReceive {
batch.add(it)
// init deadline on first item added to batch
if (batch.size == 1) deadline = System.currentTimeMillis() + MAX_TIME
}
// when timeout is reached just finish select, note: no timeout when batch is empty
if (batch.isNotEmpty()) onTimeout(remainingTime) {}
}
}
}
现在,只要您需要向数据库发送任何内容,actor 内部的逻辑就会负责批处理并将生成的批处理保存到数据库中。
我想写一个简单的批处理器class。它有一个请求队列,等待这个队列变满或经过一些时间,然后才与数据库对话。
通过通道实现这个队列非常方便——这样我们的客户端就会在队列满时挂起。但是我怎么知道频道是否已满?
当然,我可以创建一个方法,将某些内容发送到通道,然后执行一些检查。下一步是将其封装在从 Channel 派生的 class 中。仍然很脏(我不清楚如何处理 onSend
/onReceive
)。有没有更优雅的解决方案?也许开箱即用?
通道接口声明了一个 isFull 属性,可以查询它以确定它是否已达到容量。
无论如何我看不到有回调函数来在达到容量时自动调用函数,但您可以定期检查此 isFull 属性 以查看它是否已达到容量。
这不是什么 out-of-the 框,但是可以使用 actor 轻松实现相应的批处理逻辑。您实际上并不需要 class(但如果您愿意,可以将此代码包装在 class 中)。您可以使用以下实现作为模板:
const val MAX_SIZE = 100 // max number of data items in batch
const val MAX_TIME = 1000 // max time (in ms) to wait
val batchActor = actor<Data> {
val batch = mutableListOf<Data>()
var deadline = 0L // deadline for sending this batch to DB
while (true) {
// when deadline is reached or size is exceeded, then force batch to DB
val remainingTime = deadline - System.currentTimeMillis()
if (batch.isNotEmpty() && remainingTime <= 0 || batch.size >= MAX_SIZE) {
saveToDB(batch)
batch.clear()
continue
}
// wait until items is received or timeout reached
select<Unit> {
// when received -> add to batch
channel.onReceive {
batch.add(it)
// init deadline on first item added to batch
if (batch.size == 1) deadline = System.currentTimeMillis() + MAX_TIME
}
// when timeout is reached just finish select, note: no timeout when batch is empty
if (batch.isNotEmpty()) onTimeout(remainingTime) {}
}
}
}
现在,只要您需要向数据库发送任何内容,actor 内部的逻辑就会负责批处理并将生成的批处理保存到数据库中。