Clojure:定期轮询数据库 - core.async w/超时通道 VS 香草递归线程 w/睡眠?
Clojure: Polling database periodically - core.async w/ timeout channel VS vanilla recursive Thread w/ sleep?
我有一个基于 Ring 的服务器,它有一个用于存储应用程序状态的原子,对于频繁更改的信息,每 10 秒定期从数据库中获取一次,其余信息每 60 秒一次。
(defn set-world-update-interval
[f time-in-ms]
(let [stop (async/chan)]
(async/go-loop []
(async/alt!
(async/timeout time-in-ms) (do (async/<! (async/thread (f)))
(recur))
stop :stop))
stop))
(mount/defstate world-listener
:start (set-world-update-interval #(do (println "Checking data in db") (reset! world-atom (fetch-world-data)) ) 10000)
:stop (async/close! world-listener))
效果不错。 RAM 使用情况非常稳定。但我想知道这是否是对 core.async?
的不当使用
也许它应该是一个常规线程而不是像这样?
(doto (Thread. (fn []
(loop []
(Thread/sleep 1000)
(println "Checking data in db")
(reset! world-atom (fetch-world-data))
(recur))))
(.setUncaughtExceptionHandler
(reify Thread$UncaughtExceptionHandler
(uncaughtException [this thread exception]
(println "Cleaning up!"))))
(.start))
我觉得使用 go-loop
创建一个超时停放的 goroutine 有点傻,而你真正想做的所有工作都是 IO 密集型的,因此(正确地)在一个单独的线程。这样做的结果是您每个周期都在经历线程。这些线程由 core.async 池化,因此您无需执行从无到有创建新线程的昂贵工作,但将它们从池中取出并与 core.async 交互超时仍然存在一些开销.我只是为了简单起见,将 core.async 排除在该操作之外。
虽然您的 core.async
实施此模式没有任何问题,但我建议为此使用 java.util.concurrent.ScheduledExecutorService
。它使您可以精确控制线程池和调度。
尝试这样的事情:
(ns your-app.world
(:require [clojure.tools.logging :as log]
[mount.core :as mount])
(:import
(java.util.concurrent Executors ScheduledExecutorService ThreadFactory TimeUnit)))
(defn ^ThreadFactory create-thread-factory
[thread-name-prefix]
(let [thread-number (atom 0)]
(reify ThreadFactory
(newThread [_ runnable]
(Thread. runnable (str thread-name-prefix "-" (swap! thread-number inc)))))))
(defn ^ScheduledExecutorService create-single-thread-scheduled-executor
[thread-name-prefix]
(let [thread-factory (create-thread-factory thread-name-prefix)]
(Executors/newSingleThreadScheduledExecutor thread-factory)))
(defn schedule
[executor runnable interval unit]
(.scheduleWithFixedDelay executor runnable 0 interval unit))
(defn shutdown-executor
"Industrial-strength executor shutdown, modify/simplify according to need."
[^ScheduledExecutorService executor]
(if (.isShutdown executor)
(log/info "Executor already shut down")
(do
(log/info "Shutting down executor")
(.shutdown executor) ;; Disable new tasks from being scheduled
(try
;; Wait a while for currently running tasks to finish
(if-not (.awaitTermination executor 10 TimeUnit/SECONDS)
(do
(.shutdownNow executor) ;; Cancel currently running tasks
(log/info "Still waiting to shut down executor. Sending interrupt to tasks.")
;; Wait a while for tasks to respond to being cancelled
(when-not (.awaitTermination executor 10 TimeUnit/SECONDS)
(throw (ex-info "Executor could not be shut down" {}))))
(log/info "Executor shutdown completed"))
(catch InterruptedException _
(log/info "Interrupted while shutting down. Sending interrupt to tasks.")
;; Re-cancel if current thread also interrupted
(.shutdownNow executor)
;; Preserve interrupt status
(.interrupt (Thread/currentThread)))))))
(defn world-updating-fn
[]
(log/info "Updating world atom")
;; Do your thing here
)
(mount/defstate world-listener
:start (doto (create-single-thread-scheduled-executor "world-listener")
(schedule world-updating-fn 10 TimeUnit/MINUTES))
:stop (shutdown-executor world-listener))
我有一个基于 Ring 的服务器,它有一个用于存储应用程序状态的原子,对于频繁更改的信息,每 10 秒定期从数据库中获取一次,其余信息每 60 秒一次。
(defn set-world-update-interval
[f time-in-ms]
(let [stop (async/chan)]
(async/go-loop []
(async/alt!
(async/timeout time-in-ms) (do (async/<! (async/thread (f)))
(recur))
stop :stop))
stop))
(mount/defstate world-listener
:start (set-world-update-interval #(do (println "Checking data in db") (reset! world-atom (fetch-world-data)) ) 10000)
:stop (async/close! world-listener))
效果不错。 RAM 使用情况非常稳定。但我想知道这是否是对 core.async?
的不当使用也许它应该是一个常规线程而不是像这样?
(doto (Thread. (fn []
(loop []
(Thread/sleep 1000)
(println "Checking data in db")
(reset! world-atom (fetch-world-data))
(recur))))
(.setUncaughtExceptionHandler
(reify Thread$UncaughtExceptionHandler
(uncaughtException [this thread exception]
(println "Cleaning up!"))))
(.start))
我觉得使用 go-loop
创建一个超时停放的 goroutine 有点傻,而你真正想做的所有工作都是 IO 密集型的,因此(正确地)在一个单独的线程。这样做的结果是您每个周期都在经历线程。这些线程由 core.async 池化,因此您无需执行从无到有创建新线程的昂贵工作,但将它们从池中取出并与 core.async 交互超时仍然存在一些开销.我只是为了简单起见,将 core.async 排除在该操作之外。
虽然您的 core.async
实施此模式没有任何问题,但我建议为此使用 java.util.concurrent.ScheduledExecutorService
。它使您可以精确控制线程池和调度。
尝试这样的事情:
(ns your-app.world
(:require [clojure.tools.logging :as log]
[mount.core :as mount])
(:import
(java.util.concurrent Executors ScheduledExecutorService ThreadFactory TimeUnit)))
(defn ^ThreadFactory create-thread-factory
[thread-name-prefix]
(let [thread-number (atom 0)]
(reify ThreadFactory
(newThread [_ runnable]
(Thread. runnable (str thread-name-prefix "-" (swap! thread-number inc)))))))
(defn ^ScheduledExecutorService create-single-thread-scheduled-executor
[thread-name-prefix]
(let [thread-factory (create-thread-factory thread-name-prefix)]
(Executors/newSingleThreadScheduledExecutor thread-factory)))
(defn schedule
[executor runnable interval unit]
(.scheduleWithFixedDelay executor runnable 0 interval unit))
(defn shutdown-executor
"Industrial-strength executor shutdown, modify/simplify according to need."
[^ScheduledExecutorService executor]
(if (.isShutdown executor)
(log/info "Executor already shut down")
(do
(log/info "Shutting down executor")
(.shutdown executor) ;; Disable new tasks from being scheduled
(try
;; Wait a while for currently running tasks to finish
(if-not (.awaitTermination executor 10 TimeUnit/SECONDS)
(do
(.shutdownNow executor) ;; Cancel currently running tasks
(log/info "Still waiting to shut down executor. Sending interrupt to tasks.")
;; Wait a while for tasks to respond to being cancelled
(when-not (.awaitTermination executor 10 TimeUnit/SECONDS)
(throw (ex-info "Executor could not be shut down" {}))))
(log/info "Executor shutdown completed"))
(catch InterruptedException _
(log/info "Interrupted while shutting down. Sending interrupt to tasks.")
;; Re-cancel if current thread also interrupted
(.shutdownNow executor)
;; Preserve interrupt status
(.interrupt (Thread/currentThread)))))))
(defn world-updating-fn
[]
(log/info "Updating world atom")
;; Do your thing here
)
(mount/defstate world-listener
:start (doto (create-single-thread-scheduled-executor "world-listener")
(schedule world-updating-fn 10 TimeUnit/MINUTES))
:stop (shutdown-executor world-listener))