草书:Clojure 的 *out*,不同的 Writers,多线程时刷新和排序不一致:发生了什么事?

Cursive: Clojure's *out*, different Writers, flushing and ordering inconsistency when multithreaded: what is going on?

tl;dr 为什么 Clojure 在 newFixedThreadPool 中为线程创建一个单独的 Writer?为什么它可能在池终止后被刷新?为什么只能在 Cursive 中重现该行为?

假设我们有一个应用程序在单独的线程中执行某些操作,并且某些操作写入 stdout。假设在我们完成所有操作后,我们想要打印最后一条消息。

我们首先要 运行 了解 Clojure 的 println,如果提供多个​​参数,将产生交错输出。这涵盖了 here.

不过好像还有一个问题。如果我们 运行 像这样:

(defn main []
  (let [pool (make-pool num-threads)]
    (print-multithreaded pool "Hello, world!")
    (shutdown-pool pool))
  (safe-println "All done, have a nice day."))

我们有时会有

Hello, world!
All done, have a nice day.

有时

All done, have a nice day.
Hello, world!

也许 flush 在每次写入之后?

(defn safe-println [& more]
  (.write *out* (str (clojure.string/join " " more) "\n"))
  (.flush *out*))

无效。 有效的方法是在 System.out 之上诉诸显式 Java 互操作,如下所示:

(defn safe-println [& more]
  (let [writer (System/out)]
    (.println writer (str (clojure.string/join " " more)))
    (.flush writer)))

writer 设为 (PrintWriter. System/out)(OutputStreamWriter. System/out) 也有效。

似乎我们的线程中有不同的 *out*s...确实,

(def out *out*)
(defn safe-println [& more]
  (.write out (str (clojure.string/join " " more) "\n"))
  (.flush out))

有效。

那么问题来了:为什么会这样?对于 Java 个片段,这是有道理的:System.out 是静态最终的,因此所有线程只存在一个实例,并且所有内容都与它对话,因此所有内容都添加到同一个缓冲区。通过打印到 Clojure 的 *out*,主线程和池线程有自己的 *out*,有自己的缓冲区(主线程是 PrintWriter,池线程是共享的 OutputStreamWriter).我真的不明白为什么一开始会这样,我也不明白为什么它会导致不一致的排序:我们明确地完成所有线程 before 调用最终打印,这应该会导致隐式 flush。但即使我们添加显式 flush,结果也保持不变。

我可能在这里遗漏了一些非常明显的细节,如果你能帮助我,我会很高兴。如果您想查看整个可重现的示例(由于篇幅原因我没有在此处包括),这里有一个要点 link:https://gist.github.com/trueneu/b8498aa259899a8fc979090fccf632de

编辑:要点的第一个版本确实有效,你必须修改它才能破坏它,所以我编辑它以从一开始就演示 "incorrect" 行为。

此外,为了消除任何误解,这是 Cursive 的屏幕截图:https://ibb.co/jHqSL0

EDIT2:这是在原始问题中指出的,但我会强调一下。了解这种行为的要点和机制是问题的一半。新 *out* 不是为每个线程创建的。但它似乎正在为线程池创建一个单独的线程池。 (对于此输出,将 num-threads 减少到 1,并将 (.toString *out*) 的打印添加到 safe-println。增加 num-threads 不会产生新的对象地址:

(main)
java.io.PrintWriter@1dcc77c6
All done, have a nice day.
=> nil
java.io.OutputStreamWriter@7104a76f
Hello, world!

EDIT3:在@glts 评论后将 map 更改为 doseq。 此外,当 lein repl 中的 运行 时,它始终会产生正确的输出,这让我更加困惑。因此,正如 David Arenas 指出的那样,行为似乎取决于上游输出处理。然而,问题依然存在。

EDIT4:David Arenas 还在 Cider 中进行了检查,但无法重现该行为。似乎与 Cursive 的 nrepl 输出处理实现有关。

Clojure 的 *out* 不会为每个线程创建一个实例(它也是静态最终的),但它确实使用了没有原子保证的 OutputStreamWriter。由于您正在写入单个流,因此您需要同步缓冲区上的线程。

如果您 运行 您的代码使用 nrepl,您将看到 "correct" 行为。这是因为他们将 out 重新绑定到他们自己的使用锁定缓冲区的编写器。

nrepl 的会话结束:

(defn- session-out
  "Returns a PrintWriter suitable for binding as *out* or *err*.  All of
   the content written to that PrintWriter will (when .flush-ed) be sent on the
   given transport in messages specifying the given session-id.
   `channel-type` should be :out or :err, as appropriate."
  [channel-type session-id transport]
  (let [buf (clojure.tools.nrepl.StdOutBuffer.)]
    (PrintWriter. (proxy [Writer] []
                    (close [] (.flush ^Writer this))
                    (write [& [x ^Integer off ^Integer len]]
                      (locking buf
                        (cond
                          (number? x) (.append buf (char x))
                          (not off) (.append buf x)
                          ; the CharSequence overload of append takes an *end* idx, not length!
                          (instance? CharSequence x) (.append buf ^CharSequence x (int off) (int (+ len off)))
                          :else (.append buf ^chars x off len))
                        (when (<= *out-limit* (.length buf))
                          (.flush ^Writer this))))
                    (flush []
                      (let [text (locking buf (let [text (str buf)]
                                                (.setLength buf 0)
                                                text))]
                        (when (pos? (count text))
                          (t/send (or (:transport *msg*) transport)
                                  (response-for *msg* :session session-id
                                                channel-type text))))))
                  true)))