Clojure:替代使用 mutex/lock 和计数器
Clojure: alternative to using a mutex/lock and a counter
场景:我有一个服务器侦听六个活动的 TCP/IP 连接。当收到 "ready" 消息时,将在它自己的线程上引发一个事件。当服务器收到来自每个连接的"ready"消息时,它需要运行"start"函数。
我的面向对象解决方案可能涉及使用互斥锁和计数器。类似于:
int _countDown= 6;
object _lock;
void ReadyMessageReceivedForTheFirstTimeFromAConnection() {
lock(_lock) {
--_countDown; //
if (_countDown==0) Start();
}
}
如何在不求助于 locks/mutexes 的情况下在 Clojure 中解决这个问题?
您可以为此目的使用 CountDownLatch 或 Phaser。
在我的期货库中,迫在眉睫,我都用了。 CountDownLatch first and then replaced it with a Phaser for ForkJoin compatibility (might not be necessary in your case). You can see the change in this diff。希望它能让您了解两者的用法。
对于闩锁,总体思路是:
(let [latch (CountDownLatch. 6)]
(on-receive-message this (fn [_] (.countDown latch)))
(.await latch)
...或类似的东西。
如果您更喜欢纯 clojure 版本,则可以使用 promise 来试一试您的 futures。
每次收到消息时,您都会增加 conn-count
手表检查是否达到阈值并交付 :go to the barrier promise.
(def wait-barrier (promise))
(def conn-count (atom 0))
(add-watch conn-count :barrier-watch
(fn [key ref old-state new-state]
(when (== new-state 6)
(deliver wait-barrier :go))))
虚拟示例:
(def wait-barrier (promise))
(def conn-count (atom 0))
(defn worker-dummy []
(when (= @wait-barrier :go)
(println "I'm a worker")))
(defn dummy-receive-msg []
(doall (repeatedly 6,
(fn []
(println "received msg")
(swap! conn-count inc)))))
(let [workers (doall (repeatedly 6 (fn [] (future (worker-dummy)))))]
(add-watch conn-count :barrier-watch
(fn [key ref old-state new-state]
(when (== new-state 6)
(deliver wait-barrier :go))))
(dummy-receive-msg)
(doall (map deref workers)))
因为到目前为止还没有提到:你可以很容易地用 core.async. Have a look at this MCVE:
(let [conn-count 6
ready-chan (chan)]
;; Spawn a thread for each connection.
(doseq [conn-id (range conn-count)]
(thread
(Thread/sleep (rand-int 2000))
(>!! ready-chan conn-id)))
;; Block until all connections are established.
(doseq [total (range 1 (inc conn-count))]
(println (<!! ready-chan) "connected," total "overall"))
;; Invoke start afterwards.
(println "start"))
;; 5 connected, 1 overall
;; 3 connected, 2 overall
;; 4 connected, 3 overall
;; 0 connected, 4 overall
;; 1 connected, 5 overall
;; 2 connected, 6 overall
;; start
;;=> nil
您还可以使用通道来实现倒计时锁存器(借用自 Christophe Grand):
(defn count-down-latch-chan [n]
(chan 1 (comp (drop (dec n)) (take 1))))
有关 core.async 的简短介绍,请查看 "Clojure for the Brave and True" 中的 this Gist. For a longer one, read the corresponding chapter。
场景:我有一个服务器侦听六个活动的 TCP/IP 连接。当收到 "ready" 消息时,将在它自己的线程上引发一个事件。当服务器收到来自每个连接的"ready"消息时,它需要运行"start"函数。
我的面向对象解决方案可能涉及使用互斥锁和计数器。类似于:
int _countDown= 6;
object _lock;
void ReadyMessageReceivedForTheFirstTimeFromAConnection() {
lock(_lock) {
--_countDown; //
if (_countDown==0) Start();
}
}
如何在不求助于 locks/mutexes 的情况下在 Clojure 中解决这个问题?
您可以为此目的使用 CountDownLatch 或 Phaser。
在我的期货库中,迫在眉睫,我都用了。 CountDownLatch first and then replaced it with a Phaser for ForkJoin compatibility (might not be necessary in your case). You can see the change in this diff。希望它能让您了解两者的用法。
对于闩锁,总体思路是:
(let [latch (CountDownLatch. 6)]
(on-receive-message this (fn [_] (.countDown latch)))
(.await latch)
...或类似的东西。
如果您更喜欢纯 clojure 版本,则可以使用 promise 来试一试您的 futures。
每次收到消息时,您都会增加 conn-count 手表检查是否达到阈值并交付 :go to the barrier promise.
(def wait-barrier (promise))
(def conn-count (atom 0))
(add-watch conn-count :barrier-watch
(fn [key ref old-state new-state]
(when (== new-state 6)
(deliver wait-barrier :go))))
虚拟示例:
(def wait-barrier (promise))
(def conn-count (atom 0))
(defn worker-dummy []
(when (= @wait-barrier :go)
(println "I'm a worker")))
(defn dummy-receive-msg []
(doall (repeatedly 6,
(fn []
(println "received msg")
(swap! conn-count inc)))))
(let [workers (doall (repeatedly 6 (fn [] (future (worker-dummy)))))]
(add-watch conn-count :barrier-watch
(fn [key ref old-state new-state]
(when (== new-state 6)
(deliver wait-barrier :go))))
(dummy-receive-msg)
(doall (map deref workers)))
因为到目前为止还没有提到:你可以很容易地用 core.async. Have a look at this MCVE:
(let [conn-count 6
ready-chan (chan)]
;; Spawn a thread for each connection.
(doseq [conn-id (range conn-count)]
(thread
(Thread/sleep (rand-int 2000))
(>!! ready-chan conn-id)))
;; Block until all connections are established.
(doseq [total (range 1 (inc conn-count))]
(println (<!! ready-chan) "connected," total "overall"))
;; Invoke start afterwards.
(println "start"))
;; 5 connected, 1 overall
;; 3 connected, 2 overall
;; 4 connected, 3 overall
;; 0 connected, 4 overall
;; 1 connected, 5 overall
;; 2 connected, 6 overall
;; start
;;=> nil
您还可以使用通道来实现倒计时锁存器(借用自 Christophe Grand):
(defn count-down-latch-chan [n]
(chan 1 (comp (drop (dec n)) (take 1))))
有关 core.async 的简短介绍,请查看 "Clojure for the Brave and True" 中的 this Gist. For a longer one, read the corresponding chapter。