为什么我使用通道 sub/unsub 的以下代码会发生内存泄漏?
Why do I have memory leak for the following code with channel sub/unsub?
我正在使用 [org.clojure/clojure "1.10.1"],[org.clojure/core.async "1.2.603"]
和最新的 Amazon Corretto 11 JVM(如果与它们有关的话)。
以下代码是生产中使用的代码的简化版本,它确实会导致内存泄漏。我不知道为什么会这样,但怀疑可能是 sub/unsub 个频道造成的。任何人都可以帮助指出我的代码可能出错的地方或我如何修复内存泄漏吗?
(ns test-gc.core
(:require [clojure.core.async :as a :refer [chan put! close! <! go >! go-loop timeout]])
(:import [java.util UUID]))
(def global-msg-ch (chan (a/sliding-buffer 200)))
(def global-msg-pub (a/pub global-msg-ch :id))
(defn io-promise []
(let [id (UUID/randomUUID)
ch (chan)]
(a/sub global-msg-pub id ch)
[id (go
(let [x (<! ch)]
(a/unsub global-msg-pub id ch)
(:data x)))]))
(defn -main []
(go-loop []
(<! (timeout 1))
(let [[pid pch] (io-promise)
cmd {:id pid
:data (rand-int 1E5)}]
(>! global-msg-ch cmd)
(println (<! pch)))
(recur))
(while true
(Thread/yield)))
快速堆转储给出了以下统计信息,例如:
Class 按实例数
java.util.LinkedList
5,157,128 (14.4%)
java.util.concurrent.atomic.AtomicReference
3,698,382 (10.3%)
clojure.lang.Atom
3,094,279 (8.6%)
- ...
Class 按实例大小
java.lang.Object[]
210,061,752 B (13.8%)
java.util.LinkedList
206,285,120 B (13.6%)
clojure.lang.Atom
148,525,392 B (9.8%)
clojure.core.async.impl.channels.ManyToManyChannel
132,022,336 B (8.7%)
- ...
我终于明白为什么了。通过查看源码,我们得到如下片段:
(defn pub
"Creates and returns a pub(lication) of the supplied channel, ..."
...
(let [mults (atom {}) ;;topic->mult
ensure-mult (fn [topic]
(or (get @mults topic)
(get (swap! mults
#(if (% topic) % (assoc % topic (mult (chan (buf-fn topic))))))
topic)))
p (reify
Mux
(muxch* [_] ch)
Pub
(sub* [p topic ch close?]
(let [m (ensure-mult topic)]
(tap m ch close?)))
(unsub* [p topic ch]
(when-let [m (get @mults topic)]
(untap m ch)))
(unsub-all* [_] (reset! mults {}))
(unsub-all* [_ topic] (swap! mults dissoc topic)))]
...
p)))
我们可以看到 mults
存储所有 topic
因此如果我们不清除它,它将单调增加。我们可能会添加类似 (a/unsub-all* global-msg-pub pid)
的内容来解决这个问题。
我正在使用 [org.clojure/clojure "1.10.1"],[org.clojure/core.async "1.2.603"]
和最新的 Amazon Corretto 11 JVM(如果与它们有关的话)。
以下代码是生产中使用的代码的简化版本,它确实会导致内存泄漏。我不知道为什么会这样,但怀疑可能是 sub/unsub 个频道造成的。任何人都可以帮助指出我的代码可能出错的地方或我如何修复内存泄漏吗?
(ns test-gc.core
(:require [clojure.core.async :as a :refer [chan put! close! <! go >! go-loop timeout]])
(:import [java.util UUID]))
(def global-msg-ch (chan (a/sliding-buffer 200)))
(def global-msg-pub (a/pub global-msg-ch :id))
(defn io-promise []
(let [id (UUID/randomUUID)
ch (chan)]
(a/sub global-msg-pub id ch)
[id (go
(let [x (<! ch)]
(a/unsub global-msg-pub id ch)
(:data x)))]))
(defn -main []
(go-loop []
(<! (timeout 1))
(let [[pid pch] (io-promise)
cmd {:id pid
:data (rand-int 1E5)}]
(>! global-msg-ch cmd)
(println (<! pch)))
(recur))
(while true
(Thread/yield)))
快速堆转储给出了以下统计信息,例如:
Class 按实例数
java.util.LinkedList
5,157,128 (14.4%)java.util.concurrent.atomic.AtomicReference
3,698,382 (10.3%)clojure.lang.Atom
3,094,279 (8.6%)- ...
Class 按实例大小
java.lang.Object[]
210,061,752 B (13.8%)java.util.LinkedList
206,285,120 B (13.6%)clojure.lang.Atom
148,525,392 B (9.8%)clojure.core.async.impl.channels.ManyToManyChannel
132,022,336 B (8.7%)- ...
我终于明白为什么了。通过查看源码,我们得到如下片段:
(defn pub
"Creates and returns a pub(lication) of the supplied channel, ..."
...
(let [mults (atom {}) ;;topic->mult
ensure-mult (fn [topic]
(or (get @mults topic)
(get (swap! mults
#(if (% topic) % (assoc % topic (mult (chan (buf-fn topic))))))
topic)))
p (reify
Mux
(muxch* [_] ch)
Pub
(sub* [p topic ch close?]
(let [m (ensure-mult topic)]
(tap m ch close?)))
(unsub* [p topic ch]
(when-let [m (get @mults topic)]
(untap m ch)))
(unsub-all* [_] (reset! mults {}))
(unsub-all* [_ topic] (swap! mults dissoc topic)))]
...
p)))
我们可以看到 mults
存储所有 topic
因此如果我们不清除它,它将单调增加。我们可能会添加类似 (a/unsub-all* global-msg-pub pid)
的内容来解决这个问题。