为什么这个调度功能没有达到预期效果?
Why does this scheduling function not work as expected?
我正在编写一个小应用程序来验证一些网络应用程序是否正常。为此,我编写了一个简单的 HealthCheck
摘要 class,它需要两个函数,is_healthy
和 on_fail
。这是我注册两次健康检查的代码:
import threading
import sched
import time
def register_health_check(checker, interval):
s = sched.scheduler(time.time, time.sleep)
s.enter(interval, 1, _check, (s,checker,interval))
t = threading.Thread(target=s.run, daemon=True)
t.start()
def _check(sc, checker, interval):
if checker.is_healthy():
sc.enter(interval, 1, _check, (sc,checker,interval))
else:
checker.on_fail()
然后,要注册健康检查,我会这样做:
register_health_check(CheckMain('foo'), 1)
register_health_check(CheckMain('bar'), 5)
CheckMain
的 is_healthy
只是分别打印 checking foo
或 checking bar
。我期望的输出是:
checking foo // 1 second
checking foo // 2 seconds
checking foo // 3 seconds
checking foo // 4 seconds
checking foo // 5 seconds
checking bar // also 5 seconds-ish
checking foo // 6 seconds
checking foo // etc.
...
实际输出为:
checking foo
checking foo
checking foo
checking foo
checking foo
checking foo
checking foo
checking foo
checking bar
checking foo
checking bar
checking foo
checking foo
checking foo
checking foo
checking foo
checking foo
checking foo
checking foo
我的实现有什么问题?
由于没有人回答,我认为多线程对于这项任务来说太复杂了。我真正想做的就是在需要时执行一系列检查。我重新设计了模块,使其看起来像这样:
health_checks = []
def register_health_check(checker):
health_checks.append(checker)
def _check_all(sc):
for hc in health_checks:
hc.time_passed += 1
if hc.time_passed == hc.interval_secs:
hc.reset()
if not hc.is_healthy():
hc.on_fail()
sc.enter(1, 1, _check_all, (sc,))
def start():
s = sched.scheduler(time.time, time.sleep)
s.enter(1, 1, _check_all, (s,))
s.run()
我正在编写一个小应用程序来验证一些网络应用程序是否正常。为此,我编写了一个简单的 HealthCheck
摘要 class,它需要两个函数,is_healthy
和 on_fail
。这是我注册两次健康检查的代码:
import threading
import sched
import time
def register_health_check(checker, interval):
s = sched.scheduler(time.time, time.sleep)
s.enter(interval, 1, _check, (s,checker,interval))
t = threading.Thread(target=s.run, daemon=True)
t.start()
def _check(sc, checker, interval):
if checker.is_healthy():
sc.enter(interval, 1, _check, (sc,checker,interval))
else:
checker.on_fail()
然后,要注册健康检查,我会这样做:
register_health_check(CheckMain('foo'), 1)
register_health_check(CheckMain('bar'), 5)
CheckMain
的 is_healthy
只是分别打印 checking foo
或 checking bar
。我期望的输出是:
checking foo // 1 second
checking foo // 2 seconds
checking foo // 3 seconds
checking foo // 4 seconds
checking foo // 5 seconds
checking bar // also 5 seconds-ish
checking foo // 6 seconds
checking foo // etc.
...
实际输出为:
checking foo
checking foo
checking foo
checking foo
checking foo
checking foo
checking foo
checking foo
checking bar
checking foo
checking bar
checking foo
checking foo
checking foo
checking foo
checking foo
checking foo
checking foo
checking foo
我的实现有什么问题?
由于没有人回答,我认为多线程对于这项任务来说太复杂了。我真正想做的就是在需要时执行一系列检查。我重新设计了模块,使其看起来像这样:
health_checks = []
def register_health_check(checker):
health_checks.append(checker)
def _check_all(sc):
for hc in health_checks:
hc.time_passed += 1
if hc.time_passed == hc.interval_secs:
hc.reset()
if not hc.is_healthy():
hc.on_fail()
sc.enter(1, 1, _check_all, (sc,))
def start():
s = sched.scheduler(time.time, time.sleep)
s.enter(1, 1, _check_all, (s,))
s.run()