草书:Clojure 的 *out*,不同的 Writers,多线程时刷新和排序不一致:发生了什么事?
Cursive: Clojure's *out*, different Writers, flushing and ordering inconsistency when multithreaded: what is going on?
tl;dr 为什么 Clojure 在 newFixedThreadPool
中为线程创建一个单独的 Writer
?为什么它可能在池终止后被刷新?为什么只能在 Cursive 中重现该行为?
假设我们有一个应用程序在单独的线程中执行某些操作,并且某些操作写入 stdout
。假设在我们完成所有操作后,我们想要打印最后一条消息。
我们首先要 运行 了解 Clojure 的 println
,如果提供多个参数,将产生交错输出。这涵盖了 here.
不过好像还有一个问题。如果我们 运行 像这样:
(defn main []
(let [pool (make-pool num-threads)]
(print-multithreaded pool "Hello, world!")
(shutdown-pool pool))
(safe-println "All done, have a nice day."))
我们有时会有
Hello, world!
All done, have a nice day.
有时
All done, have a nice day.
Hello, world!
也许 flush
在每次写入之后?
(defn safe-println [& more]
(.write *out* (str (clojure.string/join " " more) "\n"))
(.flush *out*))
无效。
有效的方法是在 System.out
之上诉诸显式 Java 互操作,如下所示:
(defn safe-println [& more]
(let [writer (System/out)]
(.println writer (str (clojure.string/join " " more)))
(.flush writer)))
将 writer
设为 (PrintWriter. System/out)
或 (OutputStreamWriter. System/out)
也有效。
似乎我们的线程中有不同的 *out*
s...确实,
(def out *out*)
(defn safe-println [& more]
(.write out (str (clojure.string/join " " more) "\n"))
(.flush out))
有效。
那么问题来了:为什么会这样?对于 Java 个片段,这是有道理的:System.out
是静态最终的,因此所有线程只存在一个实例,并且所有内容都与它对话,因此所有内容都添加到同一个缓冲区。通过打印到 Clojure 的 *out*
,主线程和池线程有自己的 *out*
,有自己的缓冲区(主线程是 PrintWriter
,池线程是共享的 OutputStreamWriter
).我真的不明白为什么一开始会这样,我也不明白为什么它会导致不一致的排序:我们明确地完成所有线程 before 调用最终打印,这应该会导致隐式 flush
。但即使我们添加显式 flush
,结果也保持不变。
我可能在这里遗漏了一些非常明显的细节,如果你能帮助我,我会很高兴。如果您想查看整个可重现的示例(由于篇幅原因我没有在此处包括),这里有一个要点 link:https://gist.github.com/trueneu/b8498aa259899a8fc979090fccf632de
编辑:要点的第一个版本确实有效,你必须修改它才能破坏它,所以我编辑它以从一开始就演示 "incorrect" 行为。
此外,为了消除任何误解,这是 Cursive 的屏幕截图:https://ibb.co/jHqSL0
EDIT2:这是在原始问题中指出的,但我会强调一下。了解这种行为的要点和机制是问题的一半。新 *out*
不是为每个线程创建的。但它似乎正在为线程池创建一个单独的线程池。 (对于此输出,将 num-threads
减少到 1,并将 (.toString *out*)
的打印添加到 safe-println
。增加 num-threads
不会产生新的对象地址:
(main)
java.io.PrintWriter@1dcc77c6
All done, have a nice day.
=> nil
java.io.OutputStreamWriter@7104a76f
Hello, world!
EDIT3:在@glts 评论后将 map
更改为 doseq
。
此外,当 lein repl
中的 运行 时,它始终会产生正确的输出,这让我更加困惑。因此,正如 David Arenas 指出的那样,行为似乎取决于上游输出处理。然而,问题依然存在。
EDIT4:David Arenas 还在 Cider 中进行了检查,但无法重现该行为。似乎与 Cursive 的 nrepl 输出处理实现有关。
Clojure 的 *out*
不会为每个线程创建一个实例(它也是静态最终的),但它确实使用了没有原子保证的 OutputStreamWriter。由于您正在写入单个流,因此您需要同步缓冲区上的线程。
如果您 运行 您的代码使用 nrepl,您将看到 "correct" 行为。这是因为他们将 out 重新绑定到他们自己的使用锁定缓冲区的编写器。
nrepl 的会话结束:
(defn- session-out
"Returns a PrintWriter suitable for binding as *out* or *err*. All of
the content written to that PrintWriter will (when .flush-ed) be sent on the
given transport in messages specifying the given session-id.
`channel-type` should be :out or :err, as appropriate."
[channel-type session-id transport]
(let [buf (clojure.tools.nrepl.StdOutBuffer.)]
(PrintWriter. (proxy [Writer] []
(close [] (.flush ^Writer this))
(write [& [x ^Integer off ^Integer len]]
(locking buf
(cond
(number? x) (.append buf (char x))
(not off) (.append buf x)
; the CharSequence overload of append takes an *end* idx, not length!
(instance? CharSequence x) (.append buf ^CharSequence x (int off) (int (+ len off)))
:else (.append buf ^chars x off len))
(when (<= *out-limit* (.length buf))
(.flush ^Writer this))))
(flush []
(let [text (locking buf (let [text (str buf)]
(.setLength buf 0)
text))]
(when (pos? (count text))
(t/send (or (:transport *msg*) transport)
(response-for *msg* :session session-id
channel-type text))))))
true)))
tl;dr 为什么 Clojure 在 newFixedThreadPool
中为线程创建一个单独的 Writer
?为什么它可能在池终止后被刷新?为什么只能在 Cursive 中重现该行为?
假设我们有一个应用程序在单独的线程中执行某些操作,并且某些操作写入 stdout
。假设在我们完成所有操作后,我们想要打印最后一条消息。
我们首先要 运行 了解 Clojure 的 println
,如果提供多个参数,将产生交错输出。这涵盖了 here.
不过好像还有一个问题。如果我们 运行 像这样:
(defn main []
(let [pool (make-pool num-threads)]
(print-multithreaded pool "Hello, world!")
(shutdown-pool pool))
(safe-println "All done, have a nice day."))
我们有时会有
Hello, world!
All done, have a nice day.
有时
All done, have a nice day.
Hello, world!
也许 flush
在每次写入之后?
(defn safe-println [& more]
(.write *out* (str (clojure.string/join " " more) "\n"))
(.flush *out*))
无效。
有效的方法是在 System.out
之上诉诸显式 Java 互操作,如下所示:
(defn safe-println [& more]
(let [writer (System/out)]
(.println writer (str (clojure.string/join " " more)))
(.flush writer)))
将 writer
设为 (PrintWriter. System/out)
或 (OutputStreamWriter. System/out)
也有效。
似乎我们的线程中有不同的 *out*
s...确实,
(def out *out*)
(defn safe-println [& more]
(.write out (str (clojure.string/join " " more) "\n"))
(.flush out))
有效。
那么问题来了:为什么会这样?对于 Java 个片段,这是有道理的:System.out
是静态最终的,因此所有线程只存在一个实例,并且所有内容都与它对话,因此所有内容都添加到同一个缓冲区。通过打印到 Clojure 的 *out*
,主线程和池线程有自己的 *out*
,有自己的缓冲区(主线程是 PrintWriter
,池线程是共享的 OutputStreamWriter
).我真的不明白为什么一开始会这样,我也不明白为什么它会导致不一致的排序:我们明确地完成所有线程 before 调用最终打印,这应该会导致隐式 flush
。但即使我们添加显式 flush
,结果也保持不变。
我可能在这里遗漏了一些非常明显的细节,如果你能帮助我,我会很高兴。如果您想查看整个可重现的示例(由于篇幅原因我没有在此处包括),这里有一个要点 link:https://gist.github.com/trueneu/b8498aa259899a8fc979090fccf632de
编辑:要点的第一个版本确实有效,你必须修改它才能破坏它,所以我编辑它以从一开始就演示 "incorrect" 行为。
此外,为了消除任何误解,这是 Cursive 的屏幕截图:https://ibb.co/jHqSL0
EDIT2:这是在原始问题中指出的,但我会强调一下。了解这种行为的要点和机制是问题的一半。新 *out*
不是为每个线程创建的。但它似乎正在为线程池创建一个单独的线程池。 (对于此输出,将 num-threads
减少到 1,并将 (.toString *out*)
的打印添加到 safe-println
。增加 num-threads
不会产生新的对象地址:
(main)
java.io.PrintWriter@1dcc77c6
All done, have a nice day.
=> nil
java.io.OutputStreamWriter@7104a76f
Hello, world!
EDIT3:在@glts 评论后将 map
更改为 doseq
。
此外,当 lein repl
中的 运行 时,它始终会产生正确的输出,这让我更加困惑。因此,正如 David Arenas 指出的那样,行为似乎取决于上游输出处理。然而,问题依然存在。
EDIT4:David Arenas 还在 Cider 中进行了检查,但无法重现该行为。似乎与 Cursive 的 nrepl 输出处理实现有关。
Clojure 的 *out*
不会为每个线程创建一个实例(它也是静态最终的),但它确实使用了没有原子保证的 OutputStreamWriter。由于您正在写入单个流,因此您需要同步缓冲区上的线程。
如果您 运行 您的代码使用 nrepl,您将看到 "correct" 行为。这是因为他们将 out 重新绑定到他们自己的使用锁定缓冲区的编写器。
nrepl 的会话结束:
(defn- session-out
"Returns a PrintWriter suitable for binding as *out* or *err*. All of
the content written to that PrintWriter will (when .flush-ed) be sent on the
given transport in messages specifying the given session-id.
`channel-type` should be :out or :err, as appropriate."
[channel-type session-id transport]
(let [buf (clojure.tools.nrepl.StdOutBuffer.)]
(PrintWriter. (proxy [Writer] []
(close [] (.flush ^Writer this))
(write [& [x ^Integer off ^Integer len]]
(locking buf
(cond
(number? x) (.append buf (char x))
(not off) (.append buf x)
; the CharSequence overload of append takes an *end* idx, not length!
(instance? CharSequence x) (.append buf ^CharSequence x (int off) (int (+ len off)))
:else (.append buf ^chars x off len))
(when (<= *out-limit* (.length buf))
(.flush ^Writer this))))
(flush []
(let [text (locking buf (let [text (str buf)]
(.setLength buf 0)
text))]
(when (pos? (count text))
(t/send (or (:transport *msg*) transport)
(response-for *msg* :session session-id
channel-type text))))))
true)))