带有请求队列的 Kotlin 服务
A Kotlin service with request queue
我想设计一个具有以下内容的服务 API:
suspend fun getUsers(request: Request): List<User>
在幕后我会向服务器发送请求(不管如何,但可以说它是一个反应性的 WebClient
),但这里有一个技巧:我只能发送请求500毫秒,否则我会报错。
有人可以推荐我如何实现它,当我从它暂停的协程调用 getUsers
时,工作单元被添加到具有此方法的服务的某个队列中,然后在某个时间点执行并返回结果?
我假设我可以使用一些 ReceiveChannel
作为队列,为其元素创建一个 for
循环,其中包含一个 delay
,但我有点不知道该放在哪里这个逻辑。这应该像一个永远 运行 并被 getUsers
调用的后台方法吗?可能永远不会调用close
方法,所以这个方法也可以挂起,但是我如何将值从这个无限运行ning方法传回给需要结果的getUsers
呢?
编辑
目前我正在考虑这样的解决方案:
private const val REQUEST_INTERVAL = 500
@Service
class DelayedRequestSenderImpl<T> : DelayedRequestSender<T> {
private var lastRequestTime: LocalDateTime = LocalDateTime.now()
private val requestChannel: Channel<Deferred<T>> = Channel()
override suspend fun requestAsync(block: () -> T): Deferred<T> {
val deferred = GlobalScope.async(start = CoroutineStart.LAZY) { block() }
requestChannel.send(deferred)
return deferred
}
@PostConstruct
private fun startRequestProcessing() = GlobalScope.launch {
for (request in requestChannel) {
val now = LocalDateTime.now()
val diff = ChronoUnit.MILLIS.between(lastRequestTime, now)
if (diff < REQUEST_INTERVAL) {
delay(REQUEST_INTERVAL - diff)
lastRequestTime = now
}
request.start()
}
}
}
我在这里看到的问题是我必须泛化 class 以使 requestChannel
通用,因为请求的结果可能是任何东西。但这意味着 DelayedRequestSender
的每个实例都将绑定到特定类型。关于如何避免这种情况的任何建议?
编辑 2
这是一个精简版。我目前看到的唯一可能的流程是我们必须制作 @PostConstruct
方法 public 以便在我们想要或使用反射时编写任何测试。
我们的想法是不使用 GlobalScope
并且还有一个单独的 Job
作为处理方法。这是一个很好的方法吗?
interface DelayingSupplier {
suspend fun <T> supply(block: () -> T): T
}
@Service
class DelayingSupplierImpl(@Value("${vk.request.interval}") private val interval: Int) : DelayingSupplier {
private var lastRequestTime: LocalDateTime = LocalDateTime.now()
private val requestChannel: Channel<Deferred<*>> = Channel()
private val coroutineScope = CoroutineScope(EmptyCoroutineContext)
override suspend fun <T> supply(block: () -> T): T {
val deferred = coroutineScope.async(start = CoroutineStart.LAZY) { block() }
requestChannel.send(deferred)
return deferred.await()
}
@PostConstruct
fun startProcessing() = coroutineScope.launch(context = Job(coroutineScope.coroutineContext[Job])) {
for (request in requestChannel) {
val now = LocalDateTime.now()
val diff = ChronoUnit.MILLIS.between(lastRequestTime, now)
if (diff < interval) {
delay(interval - diff)
}
lastRequestTime = LocalDateTime.now()
request.start()
}
}
}
我会推荐:
- 将泛型下推到函数级别
- 使用 actor 而不是协程实现(但您可能更喜欢这个)。
无论哪种方式,此解决方案都可以让您使用队列的单个实例来处理所有请求的延迟,而不管 return 类型如何。 (抱歉,我重命名了一些东西以帮助我自己的概念化,希望这仍然有意义):
private const val REQUEST_INTERVAL = 500
interface DelayedRequestHandler {
suspend fun <T> handleWithDelay(block: () -> T): T
}
class DelayedRequestHandlerImpl(requestInterval: Int = REQUEST_INTERVAL) : DelayedRequestHandler, CoroutineScope {
private val job = Job()
override val coroutineContext = Dispatchers.Unconfined + job
private val delayedHandlerActor = delayedRequestHandlerActor(requestInterval)
override suspend fun <T> handleWithDelay(block: () -> T): T {
val result = CompletableDeferred<T>()
delayedHandlerActor.send(DelayedHandlerMsg(result, block))
return result.await()
}
}
private data class DelayedHandlerMsg<RESULT>(val result: CompletableDeferred<RESULT>, val block: () -> RESULT)
private fun CoroutineScope.delayedRequestHandlerActor(requestInterval: Int) = actor<DelayedHandlerMsg<*>>() {
var lastRequestTime: LocalDateTime = LocalDateTime.now()
for (message in channel) {
try {
println("got a message processing")
val now = LocalDateTime.now()
val diff = ChronoUnit.MILLIS.between(lastRequestTime, now)
if (diff < requestInterval) {
delay(requestInterval - diff)
}
lastRequestTime = LocalDateTime.now()
@Suppress("UNCHECKED_CAST")
val msgCast = message as DelayedHandlerMsg<Any?>
val result = msgCast.block()
println(result)
msgCast.result.complete(result)
} catch (e: Exception) {
message.result.completeExceptionally(e)
}
}
}
fun main() = runBlocking {
val mydelayHandler = DelayedRequestHandlerImpl(2000)
val jobs = List(10) {
launch {
mydelayHandler.handleWithDelay {
"Result $it"
}
}
}
jobs.forEach { it.join() }
}
所以这是我想出的最终实现。请注意 SupevisorJob
,因为我们不希望在其中一个请求失败时停止处理,这是完全可能的并且很好(至少在我的情况下)。
此外,@Laurence 建议的选项可能更好,但由于 API 被标记为已过时,我决定暂时不使用演员。
@Service
class DelayingRequestSenderImpl(@Value("${vk.request.interval}") private val interval: Int) : DelayingRequestSender {
private var lastRequestTime: LocalDateTime = LocalDateTime.now()
private val requestChannel: Channel<Deferred<*>> = Channel()
//SupervisorJob is used because we want to have continuous processing of requestChannel
//even if one of the requests fails
private val coroutineScope = CoroutineScope(SupervisorJob())
override suspend fun <T> request(block: () -> T): T {
val deferred = coroutineScope.async(start = CoroutineStart.LAZY) { block() }
requestChannel.send(deferred)
return deferred.await()
}
@PostConstruct
fun startProcessing() = coroutineScope.launch {
for (request in requestChannel) {
val now = LocalDateTime.now()
val diff = ChronoUnit.MILLIS.between(lastRequestTime, now)
if (diff < interval) {
delay(interval - diff)
}
lastRequestTime = LocalDateTime.now()
request.start()
}
}
}
我想设计一个具有以下内容的服务 API:
suspend fun getUsers(request: Request): List<User>
在幕后我会向服务器发送请求(不管如何,但可以说它是一个反应性的 WebClient
),但这里有一个技巧:我只能发送请求500毫秒,否则我会报错。
有人可以推荐我如何实现它,当我从它暂停的协程调用 getUsers
时,工作单元被添加到具有此方法的服务的某个队列中,然后在某个时间点执行并返回结果?
我假设我可以使用一些 ReceiveChannel
作为队列,为其元素创建一个 for
循环,其中包含一个 delay
,但我有点不知道该放在哪里这个逻辑。这应该像一个永远 运行 并被 getUsers
调用的后台方法吗?可能永远不会调用close
方法,所以这个方法也可以挂起,但是我如何将值从这个无限运行ning方法传回给需要结果的getUsers
呢?
编辑
目前我正在考虑这样的解决方案:
private const val REQUEST_INTERVAL = 500
@Service
class DelayedRequestSenderImpl<T> : DelayedRequestSender<T> {
private var lastRequestTime: LocalDateTime = LocalDateTime.now()
private val requestChannel: Channel<Deferred<T>> = Channel()
override suspend fun requestAsync(block: () -> T): Deferred<T> {
val deferred = GlobalScope.async(start = CoroutineStart.LAZY) { block() }
requestChannel.send(deferred)
return deferred
}
@PostConstruct
private fun startRequestProcessing() = GlobalScope.launch {
for (request in requestChannel) {
val now = LocalDateTime.now()
val diff = ChronoUnit.MILLIS.between(lastRequestTime, now)
if (diff < REQUEST_INTERVAL) {
delay(REQUEST_INTERVAL - diff)
lastRequestTime = now
}
request.start()
}
}
}
我在这里看到的问题是我必须泛化 class 以使 requestChannel
通用,因为请求的结果可能是任何东西。但这意味着 DelayedRequestSender
的每个实例都将绑定到特定类型。关于如何避免这种情况的任何建议?
编辑 2
这是一个精简版。我目前看到的唯一可能的流程是我们必须制作 @PostConstruct
方法 public 以便在我们想要或使用反射时编写任何测试。
我们的想法是不使用 GlobalScope
并且还有一个单独的 Job
作为处理方法。这是一个很好的方法吗?
interface DelayingSupplier {
suspend fun <T> supply(block: () -> T): T
}
@Service
class DelayingSupplierImpl(@Value("${vk.request.interval}") private val interval: Int) : DelayingSupplier {
private var lastRequestTime: LocalDateTime = LocalDateTime.now()
private val requestChannel: Channel<Deferred<*>> = Channel()
private val coroutineScope = CoroutineScope(EmptyCoroutineContext)
override suspend fun <T> supply(block: () -> T): T {
val deferred = coroutineScope.async(start = CoroutineStart.LAZY) { block() }
requestChannel.send(deferred)
return deferred.await()
}
@PostConstruct
fun startProcessing() = coroutineScope.launch(context = Job(coroutineScope.coroutineContext[Job])) {
for (request in requestChannel) {
val now = LocalDateTime.now()
val diff = ChronoUnit.MILLIS.between(lastRequestTime, now)
if (diff < interval) {
delay(interval - diff)
}
lastRequestTime = LocalDateTime.now()
request.start()
}
}
}
我会推荐:
- 将泛型下推到函数级别
- 使用 actor 而不是协程实现(但您可能更喜欢这个)。
无论哪种方式,此解决方案都可以让您使用队列的单个实例来处理所有请求的延迟,而不管 return 类型如何。 (抱歉,我重命名了一些东西以帮助我自己的概念化,希望这仍然有意义):
private const val REQUEST_INTERVAL = 500
interface DelayedRequestHandler {
suspend fun <T> handleWithDelay(block: () -> T): T
}
class DelayedRequestHandlerImpl(requestInterval: Int = REQUEST_INTERVAL) : DelayedRequestHandler, CoroutineScope {
private val job = Job()
override val coroutineContext = Dispatchers.Unconfined + job
private val delayedHandlerActor = delayedRequestHandlerActor(requestInterval)
override suspend fun <T> handleWithDelay(block: () -> T): T {
val result = CompletableDeferred<T>()
delayedHandlerActor.send(DelayedHandlerMsg(result, block))
return result.await()
}
}
private data class DelayedHandlerMsg<RESULT>(val result: CompletableDeferred<RESULT>, val block: () -> RESULT)
private fun CoroutineScope.delayedRequestHandlerActor(requestInterval: Int) = actor<DelayedHandlerMsg<*>>() {
var lastRequestTime: LocalDateTime = LocalDateTime.now()
for (message in channel) {
try {
println("got a message processing")
val now = LocalDateTime.now()
val diff = ChronoUnit.MILLIS.between(lastRequestTime, now)
if (diff < requestInterval) {
delay(requestInterval - diff)
}
lastRequestTime = LocalDateTime.now()
@Suppress("UNCHECKED_CAST")
val msgCast = message as DelayedHandlerMsg<Any?>
val result = msgCast.block()
println(result)
msgCast.result.complete(result)
} catch (e: Exception) {
message.result.completeExceptionally(e)
}
}
}
fun main() = runBlocking {
val mydelayHandler = DelayedRequestHandlerImpl(2000)
val jobs = List(10) {
launch {
mydelayHandler.handleWithDelay {
"Result $it"
}
}
}
jobs.forEach { it.join() }
}
所以这是我想出的最终实现。请注意 SupevisorJob
,因为我们不希望在其中一个请求失败时停止处理,这是完全可能的并且很好(至少在我的情况下)。
此外,@Laurence 建议的选项可能更好,但由于 API 被标记为已过时,我决定暂时不使用演员。
@Service
class DelayingRequestSenderImpl(@Value("${vk.request.interval}") private val interval: Int) : DelayingRequestSender {
private var lastRequestTime: LocalDateTime = LocalDateTime.now()
private val requestChannel: Channel<Deferred<*>> = Channel()
//SupervisorJob is used because we want to have continuous processing of requestChannel
//even if one of the requests fails
private val coroutineScope = CoroutineScope(SupervisorJob())
override suspend fun <T> request(block: () -> T): T {
val deferred = coroutineScope.async(start = CoroutineStart.LAZY) { block() }
requestChannel.send(deferred)
return deferred.await()
}
@PostConstruct
fun startProcessing() = coroutineScope.launch {
for (request in requestChannel) {
val now = LocalDateTime.now()
val diff = ChronoUnit.MILLIS.between(lastRequestTime, now)
if (diff < interval) {
delay(interval - diff)
}
lastRequestTime = LocalDateTime.now()
request.start()
}
}
}