在 RabbitListener 中发送命令后等待 10 秒
Wait 10 seconds after send command in RabbitListener
我的 RabbitListener 收到了计算结果。这是很多数据,所以我对其进行了批处理,然后将命令发送到 axon commandGateway。我想知道听众是否有可能在发送下一个命令之前等待几秒钟。我注意到我不能在这里使用 threadSleep。对于每个批次,它都可以像这种方法一样工作
private void myMethod() {
commandGateway.send(batchedData)
//wait 10seconds
这里有等待期的原因是什么?
如果原因是要等待 Axon Framework 处理命令中存在的所有数据,您可以改用 commandGateway.sendAndWait(Object command, ...)
。这将使当前线程等待命令执行。
如果它是一种批处理数据的机制,我建议保留一个 in-memory List
来对项目进行排队,然后使用 Spring 每 10 秒发送一个命令调度机制。我在 Kotlin 中创建了一个小示例来说明这一点:
@Service
class CalculationBatcher(
private val commandGateway: CommandGateway
) {
private val calculationQueue = LinkedList<Any>()
fun queueCalculation(calculation: Any) {
calculationQueue.add(calculation)
}
@Scheduled(fixedRate = 10000) // Send every 10 seconds
@PreDestroy // When destroying the application, send remaining events
fun sendCalculations() {
// Use pop here on the LinkedList while having items to prevent threading issues
val calculationsToSend = LinkedList<Any>()
while (calculationQueue.isNotEmpty()) {
calculationsToSend.push(calculationQueue.pop())
}
commandGateway.sendAndWait<Any>(MyEventsCommand(calculationsToSend), 10, TimeUnit.SECONDS)
}
data class MyEventsCommand(val events: List<Any>)
}
希望对您有所帮助。如果是其他原因,请告诉我!
我的 RabbitListener 收到了计算结果。这是很多数据,所以我对其进行了批处理,然后将命令发送到 axon commandGateway。我想知道听众是否有可能在发送下一个命令之前等待几秒钟。我注意到我不能在这里使用 threadSleep。对于每个批次,它都可以像这种方法一样工作
private void myMethod() {
commandGateway.send(batchedData)
//wait 10seconds
这里有等待期的原因是什么?
如果原因是要等待 Axon Framework 处理命令中存在的所有数据,您可以改用 commandGateway.sendAndWait(Object command, ...)
。这将使当前线程等待命令执行。
如果它是一种批处理数据的机制,我建议保留一个 in-memory List
来对项目进行排队,然后使用 Spring 每 10 秒发送一个命令调度机制。我在 Kotlin 中创建了一个小示例来说明这一点:
@Service
class CalculationBatcher(
private val commandGateway: CommandGateway
) {
private val calculationQueue = LinkedList<Any>()
fun queueCalculation(calculation: Any) {
calculationQueue.add(calculation)
}
@Scheduled(fixedRate = 10000) // Send every 10 seconds
@PreDestroy // When destroying the application, send remaining events
fun sendCalculations() {
// Use pop here on the LinkedList while having items to prevent threading issues
val calculationsToSend = LinkedList<Any>()
while (calculationQueue.isNotEmpty()) {
calculationsToSend.push(calculationQueue.pop())
}
commandGateway.sendAndWait<Any>(MyEventsCommand(calculationsToSend), 10, TimeUnit.SECONDS)
}
data class MyEventsCommand(val events: List<Any>)
}
希望对您有所帮助。如果是其他原因,请告诉我!