Delay() 准确性问题/作业调度程序的奇怪行为
Delay() accuracy issues / Weird behavior of job scheduler
我目前正在尝试构建一个作业调度程序,如下所示。我的目标是能够尽可能准确地安排任意函数(此处为 (Long) -> Unit))的启动时间(理想情况下亚毫秒)。
import java.util.*
import kotlinx.coroutines.*
import java.util.concurrent.PriorityBlockingQueue
import kotlin.math.max
import java.time.Instant
fun nowInMicrosSinceEpoch() : Long {
val now = Instant.now()
return now.toEpochMilli() * 1000L + (now.getNano().toLong() / 1000L)
}
open class TimeCallback(open var time : Long, open val callback : (Long) -> Unit) {
open fun run(){
callback(time)
}
override fun toString() : String {
return "(TimeCallback - T:${time/1000L})"
}
}
class PulseCallback(override var time : Long,
override val callback : (Long) -> Unit,
val pulsePeriod : Long,
val callbackQueue : AbstractQueue<TimeCallback>) : TimeCallback(time, callback) {
override fun run(){
callback(time)
time += pulsePeriod
callbackQueue.add(this)
}
override fun toString() : String {
return "(PulseCallback - T:${time/1000L} - PP:${pulsePeriod/1000L})"
}
}
abstract class Clock {
protected abstract var currentTime: Long
protected val comparator : Comparator<TimeCallback> = compareBy<TimeCallback> { x -> x.time }
abstract fun start()
abstract fun stop()
abstract fun addCallback(time: Long, callback: (Long) -> Unit)
abstract fun addPulseCallback(time: Long, pulsePeriod: Long, callback: (Long) -> Unit)
abstract fun getTime() : Long
}
class LiveClock : Clock() {
override var currentTime : Long = nowInMicrosSinceEpoch()
private val callbacks : PriorityBlockingQueue<TimeCallback> = PriorityBlockingQueue<TimeCallback>(10000, comparator)
private var clockCoroutine : Job? = null
override fun start(){
clockCoroutine = GlobalScope.launch {
try{
var waitTime : Long
while(true) {
println(callbacks)
val callback: TimeCallback = callbacks.take()
currentTime = nowInMicrosSinceEpoch()
waitTime = max(callback.time - currentTime, 0L) / 1000L
println("Now is ${currentTime/1000L}, waiting $waitTime ms until ${callback.time/1000L}")
delay(waitTime)
callback.run()
}
} finally {
println("Clock was stopped by CancellationException.")
}
}
}
override fun stop(){
// Cannot stop before starting!
clockCoroutine!!.cancel()
}
override fun addCallback(time: Long, callback: (Long) -> Unit){
callbacks.add(TimeCallback(
time = time,
callback = callback
))
}
override fun addPulseCallback(firstPulse: Long, pulsePeriod: Long, callback: (Long) -> Unit){
callbacks.add(PulseCallback(
time = firstPulse,
pulsePeriod = pulsePeriod,
callback = callback,
callbackQueue = callbacks
))
}
override fun getTime() : Long {
return nowInMicrosSinceEpoch()
}
}
fun printTest(t : Long){
println("Time difference: ${nowInMicrosSinceEpoch()/1000L - (t/1000L)} ms")
}
fun main(args: Array<String>) {
val clock = LiveClock()
clock.addPulseCallback(nowInMicrosSinceEpoch(), 1000*1000L, ::printTest)
clock.addPulseCallback(nowInMicrosSinceEpoch(), 500*1000L, ::printTest)
clock.start()
runBlocking {
// Run for 100 seconds...
delay(100000L)
}
}
然而,即使使用上面的非常简单的示例(在 main() 中),我也得到了计划时间和计划函数实际执行时间之间的显着时间差异 运行。有些甚至比预定时间早 运行(见下面最后一行,负时差),这对我来说仍然是个谜。怎么可能回调是运行在time delay()被调用之前?
谢谢!
[(PulseCallback - T:1547692545172 - PP:1000), (PulseCallback - T:1547692545184 - PP:500)]
Now is 1547692545262, waiting 0 ms until 1547692545172
1547692545264 - Time difference: 92 ms
[(PulseCallback - T:1547692545184 - PP:500), (PulseCallback - T:1547692546172 - PP:1000)]
Now is 1547692545264, waiting 0 ms until 1547692545184
1547692545264 - Time difference: 80 ms
[(PulseCallback - T:1547692545684 - PP:500), (PulseCallback - T:1547692546172 - PP:1000)]
Now is 1547692545264, waiting 420 ms until 1547692545684
1547692546110 - Time difference: 426 ms
[(PulseCallback - T:1547692546172 - PP:1000), (PulseCallback - T:1547692546184 - PP:500)]
Now is 1547692546110, waiting 62 ms until 1547692546172
1547692546234 - Time difference: 62 ms
[(PulseCallback - T:1547692546184 - PP:500), (PulseCallback - T:1547692547172 - PP:1000)]
Now is 1547692546234, waiting 0 ms until 1547692546184
1547692546234 - Time difference: 50 ms
[(PulseCallback - T:1547692546684 - PP:500), (PulseCallback - T:1547692547172 - PP:1000)]
Now is 1547692546234, waiting 450 ms until 1547692546684
1547692546136 - Time difference: -548 ms
[(PulseCallback - T:1547692547172 - PP:1000), (PulseCallback - T:1547692547184 - PP:500)]
Now is 1547692546136, waiting 1036 ms until 1547692547172
nowInMicrosSinceEpoch()
的执行是错误的。毫秒值应用两次。
为了展示这一点,这里是 Java 代码,用于打印 nowInMicrosSinceEpoch()
中使用的值:
Instant now = Instant.now();
System.out.println(now);
System.out.printf("%23d toEpochMilli()%n", now.toEpochMilli());
System.out.printf("%26d toEpochMilli() * 1000 = a%n", now.toEpochMilli() * 1000L);
System.out.printf("%29d getNano()%n", now.getNano());
System.out.printf("%26d getNano() / 1000 = b%n", now.getNano() / 1000L);
System.out.printf("%26d a + b%n", now.toEpochMilli() * 1000L + now.getNano() / 1000L);
输出
2019-02-02T00:16:58.999999999Z
1549066618999 toEpochMilli()
1549066618999000 toEpochMilli() * 1000 = a
999999999 getNano()
999999 getNano() / 1000 = b
1549066619998999 a + b
因此,当时钟从 x:58.999999999Z
滚动到 x:59.000000000Z
时,您会得到:
2019-02-02T00:16:59.000000000Z
1549066619000 toEpochMilli()
1549066619000000 toEpochMilli() * 1000 = a
000000000 getNano()
000000 getNano() / 1000 = b
1549066619000000 a + b
1 纳秒后的值returns 998999 微秒前的值。
计算出的值为运行倍速,每秒跳回1秒
正确的公式是(Java):
Instant now = Instant.now();
return now.getEpochSecond() * 1000000L + now.getNano() / 1000L;
我目前正在尝试构建一个作业调度程序,如下所示。我的目标是能够尽可能准确地安排任意函数(此处为 (Long) -> Unit))的启动时间(理想情况下亚毫秒)。
import java.util.*
import kotlinx.coroutines.*
import java.util.concurrent.PriorityBlockingQueue
import kotlin.math.max
import java.time.Instant
fun nowInMicrosSinceEpoch() : Long {
val now = Instant.now()
return now.toEpochMilli() * 1000L + (now.getNano().toLong() / 1000L)
}
open class TimeCallback(open var time : Long, open val callback : (Long) -> Unit) {
open fun run(){
callback(time)
}
override fun toString() : String {
return "(TimeCallback - T:${time/1000L})"
}
}
class PulseCallback(override var time : Long,
override val callback : (Long) -> Unit,
val pulsePeriod : Long,
val callbackQueue : AbstractQueue<TimeCallback>) : TimeCallback(time, callback) {
override fun run(){
callback(time)
time += pulsePeriod
callbackQueue.add(this)
}
override fun toString() : String {
return "(PulseCallback - T:${time/1000L} - PP:${pulsePeriod/1000L})"
}
}
abstract class Clock {
protected abstract var currentTime: Long
protected val comparator : Comparator<TimeCallback> = compareBy<TimeCallback> { x -> x.time }
abstract fun start()
abstract fun stop()
abstract fun addCallback(time: Long, callback: (Long) -> Unit)
abstract fun addPulseCallback(time: Long, pulsePeriod: Long, callback: (Long) -> Unit)
abstract fun getTime() : Long
}
class LiveClock : Clock() {
override var currentTime : Long = nowInMicrosSinceEpoch()
private val callbacks : PriorityBlockingQueue<TimeCallback> = PriorityBlockingQueue<TimeCallback>(10000, comparator)
private var clockCoroutine : Job? = null
override fun start(){
clockCoroutine = GlobalScope.launch {
try{
var waitTime : Long
while(true) {
println(callbacks)
val callback: TimeCallback = callbacks.take()
currentTime = nowInMicrosSinceEpoch()
waitTime = max(callback.time - currentTime, 0L) / 1000L
println("Now is ${currentTime/1000L}, waiting $waitTime ms until ${callback.time/1000L}")
delay(waitTime)
callback.run()
}
} finally {
println("Clock was stopped by CancellationException.")
}
}
}
override fun stop(){
// Cannot stop before starting!
clockCoroutine!!.cancel()
}
override fun addCallback(time: Long, callback: (Long) -> Unit){
callbacks.add(TimeCallback(
time = time,
callback = callback
))
}
override fun addPulseCallback(firstPulse: Long, pulsePeriod: Long, callback: (Long) -> Unit){
callbacks.add(PulseCallback(
time = firstPulse,
pulsePeriod = pulsePeriod,
callback = callback,
callbackQueue = callbacks
))
}
override fun getTime() : Long {
return nowInMicrosSinceEpoch()
}
}
fun printTest(t : Long){
println("Time difference: ${nowInMicrosSinceEpoch()/1000L - (t/1000L)} ms")
}
fun main(args: Array<String>) {
val clock = LiveClock()
clock.addPulseCallback(nowInMicrosSinceEpoch(), 1000*1000L, ::printTest)
clock.addPulseCallback(nowInMicrosSinceEpoch(), 500*1000L, ::printTest)
clock.start()
runBlocking {
// Run for 100 seconds...
delay(100000L)
}
}
然而,即使使用上面的非常简单的示例(在 main() 中),我也得到了计划时间和计划函数实际执行时间之间的显着时间差异 运行。有些甚至比预定时间早 运行(见下面最后一行,负时差),这对我来说仍然是个谜。怎么可能回调是运行在time delay()被调用之前?
谢谢!
[(PulseCallback - T:1547692545172 - PP:1000), (PulseCallback - T:1547692545184 - PP:500)]
Now is 1547692545262, waiting 0 ms until 1547692545172
1547692545264 - Time difference: 92 ms
[(PulseCallback - T:1547692545184 - PP:500), (PulseCallback - T:1547692546172 - PP:1000)]
Now is 1547692545264, waiting 0 ms until 1547692545184
1547692545264 - Time difference: 80 ms
[(PulseCallback - T:1547692545684 - PP:500), (PulseCallback - T:1547692546172 - PP:1000)]
Now is 1547692545264, waiting 420 ms until 1547692545684
1547692546110 - Time difference: 426 ms
[(PulseCallback - T:1547692546172 - PP:1000), (PulseCallback - T:1547692546184 - PP:500)]
Now is 1547692546110, waiting 62 ms until 1547692546172
1547692546234 - Time difference: 62 ms
[(PulseCallback - T:1547692546184 - PP:500), (PulseCallback - T:1547692547172 - PP:1000)]
Now is 1547692546234, waiting 0 ms until 1547692546184
1547692546234 - Time difference: 50 ms
[(PulseCallback - T:1547692546684 - PP:500), (PulseCallback - T:1547692547172 - PP:1000)]
Now is 1547692546234, waiting 450 ms until 1547692546684
1547692546136 - Time difference: -548 ms
[(PulseCallback - T:1547692547172 - PP:1000), (PulseCallback - T:1547692547184 - PP:500)]
Now is 1547692546136, waiting 1036 ms until 1547692547172
nowInMicrosSinceEpoch()
的执行是错误的。毫秒值应用两次。
为了展示这一点,这里是 Java 代码,用于打印 nowInMicrosSinceEpoch()
中使用的值:
Instant now = Instant.now();
System.out.println(now);
System.out.printf("%23d toEpochMilli()%n", now.toEpochMilli());
System.out.printf("%26d toEpochMilli() * 1000 = a%n", now.toEpochMilli() * 1000L);
System.out.printf("%29d getNano()%n", now.getNano());
System.out.printf("%26d getNano() / 1000 = b%n", now.getNano() / 1000L);
System.out.printf("%26d a + b%n", now.toEpochMilli() * 1000L + now.getNano() / 1000L);
输出
2019-02-02T00:16:58.999999999Z
1549066618999 toEpochMilli()
1549066618999000 toEpochMilli() * 1000 = a
999999999 getNano()
999999 getNano() / 1000 = b
1549066619998999 a + b
因此,当时钟从 x:58.999999999Z
滚动到 x:59.000000000Z
时,您会得到:
2019-02-02T00:16:59.000000000Z
1549066619000 toEpochMilli()
1549066619000000 toEpochMilli() * 1000 = a
000000000 getNano()
000000 getNano() / 1000 = b
1549066619000000 a + b
1 纳秒后的值returns 998999 微秒前的值。
计算出的值为运行倍速,每秒跳回1秒
正确的公式是(Java):
Instant now = Instant.now();
return now.getEpochSecond() * 1000000L + now.getNano() / 1000L;