如何从 clojure 中的子进程执行非阻塞读取标准输出?
How to perform non-blocking reading stdout from a subprocess in clojure?
我希望从 clojure 和
通过标准流与此进程通信。
使用 conch 库,我可以
生成并读取进程,并从 out
流中读取数据:
(def my-process (sh/proc "my_dumb_process"))
; read 10 lines from my-process's stdout. Will block until 10 lines taken
(take 10 (line-seq (clojure.java.io/reader (:out p))))
我想在 my-process 打印时调用异步回调
到 stdout - 只要 stdout 流中有数据可用。
我对 clojure 有点陌生 - 有没有惯用的 clojur-ey 方法
这个?我浏览了 core.async 很好,但我找不到
流的非阻塞解决方案。
用于我们目的的示例 shell 脚本(确保使其可执行),将其放在您的 clojure 项目的根目录中以便于测试:
$ cat dumb.sh
#!/bin/bash
for i in 1 2 3 4 5
do
echo "Loop iteration $i"
sleep 2
done
现在我们将定义要执行的进程,启动它并获取标准输出 ((.getInputStream process)
),一次读取一行并循环直到完成。实时读取。
(defn run-proc
[proc-name arg-string callback]
(let [pbuilder (ProcessBuilder. (into-array String [proc-name arg-string]))
process (.start pbuilder)]
(with-open [reader (clojure.java.io/reader (.getInputStream process))]
(loop []
(when-let [line (.readLine ^java.io.BufferedReader reader)]
(callback line)
(recur))))))
测试:
(run-proc "./dumb.sh" "" println)
About to start...
Loop iteration 1
Loop iteration 2
Loop iteration 3
Loop iteration 4
Loop iteration 5
=> nil
这个函数会阻塞,调用你的 callback
;如果你想让它在一个单独的线程中 运行,你可以在 future
中包装:
(future (callback line))
对于基于 core.async 的方法:
(defn run-proc-async
[proc-name arg-string callback]
(let [ch (async/chan 1000 (map callback))]
(async/thread
(let [pbuilder (ProcessBuilder. (into-array String [proc-name arg-string]))
process (.start pbuilder)]
(with-open [reader (clojure.java.io/reader (.getInputStream process))]
(loop []
(when-let [line (.readLine ^java.io.BufferedReader reader)]
(async/>!! ch line)
(recur))))))
ch))
这会将您的 callback
函数作为传感器应用到通道上,结果将放置在函数 returns:
所在的通道上
(run-proc-async "./dumb.sh" "" #(let [cnt (count %)]
(println "Counted" cnt "characters")
cnt))
#object[clojure.core.async.impl.channels.ManyToManyChannel ...]
Counted 16 characters
Counted 16 characters
Counted 16 characters
Counted 16 characters
Counted 16 characters
(async/<!! *1)
=> 16
在此示例中,通道上有 1000 的缓冲区。因此,除非您开始从通道获取,否则对 >!!
的调用将在读取 1000 行后阻塞。您也可以将 put!
与回调一起使用,但这里有一个内置的 1024 限制,无论如何您都应该处理结果。
如果您不介意使用库,可以使用 lazy-gen
和 yield
from the Tupelo library 找到一个简单的解决方案。它的工作方式类似于 生成器函数 in Python:
(ns tst.demo.core
(:use demo.core tupelo.test)
(:require
[clojure.java.io :as io]
[tupelo.core :as t]
[me.raynes.conch.low-level :as cll]
))
(t/refer-tupelo)
(dotest
(let [proc (cll/proc "dumb.sh")
>> (pretty proc)
out-lines (line-seq (io/reader (grab :out proc)))
lazy-line-seq (lazy-gen
(doseq [line out-lines]
(yield line))) ]
(doseq [curr-line lazy-line-seq]
(spyx curr-line))))
使用与之前相同的 dumb.sh
,它会产生以下输出:
{:out #object[java.lang.UNIXProcess$ProcessPipeInputStream 0x465b16bb "java.lang.UNIXProcess$ProcessPipeInputStream@465b16bb"],
:in #object[java.lang.UNIXProcess$ProcessPipeOutputStream 0xfafbc63 "java.lang.UNIXProcess$ProcessPipeOutputStream@fafbc63"],
:err #object[java.lang.UNIXProcess$ProcessPipeInputStream 0x59bb8f80 "java.lang.UNIXProcess$ProcessPipeInputStream@59bb8f80"],
:process #object[java.lang.UNIXProcess 0x553c74cc "java.lang.UNIXProcess@553c74cc"]}
; one of these is printed every 2 seconds
curr-line => "Loop iteration 1"
curr-line => "Loop iteration 2"
curr-line => "Loop iteration 3"
curr-line => "Loop iteration 4"
curr-line => "Loop iteration 5"
lazy-gen
中的所有内容都是 运行 在单独的线程中 使用 core.async
。 doseq
急切地消耗进程输出并使用 yield
将其放在输出惰性序列上。第 2 个 doseq
在当前线程 中急切地消耗 lazy-gen
的结果,并在每一行都打印出来可用。
备选方案:
一个更简单的解决方案是像这样简单地使用未来:
(dotest
(let [proc (cll/proc "dumb.sh")
out-lines (line-seq (io/reader (grab :out proc))) ]
(future
(doseq [curr-line out-lines]
(spyx curr-line)))))
结果相同:
curr-line => "Loop iteration 1"
curr-line => "Loop iteration 2"
curr-line => "Loop iteration 3"
curr-line => "Loop iteration 4"
curr-line => "Loop iteration 5"
我希望从 clojure 和 通过标准流与此进程通信。
使用 conch 库,我可以
生成并读取进程,并从 out
流中读取数据:
(def my-process (sh/proc "my_dumb_process"))
; read 10 lines from my-process's stdout. Will block until 10 lines taken
(take 10 (line-seq (clojure.java.io/reader (:out p))))
我想在 my-process 打印时调用异步回调 到 stdout - 只要 stdout 流中有数据可用。
我对 clojure 有点陌生 - 有没有惯用的 clojur-ey 方法 这个?我浏览了 core.async 很好,但我找不到 流的非阻塞解决方案。
用于我们目的的示例 shell 脚本(确保使其可执行),将其放在您的 clojure 项目的根目录中以便于测试:
$ cat dumb.sh
#!/bin/bash
for i in 1 2 3 4 5
do
echo "Loop iteration $i"
sleep 2
done
现在我们将定义要执行的进程,启动它并获取标准输出 ((.getInputStream process)
),一次读取一行并循环直到完成。实时读取。
(defn run-proc
[proc-name arg-string callback]
(let [pbuilder (ProcessBuilder. (into-array String [proc-name arg-string]))
process (.start pbuilder)]
(with-open [reader (clojure.java.io/reader (.getInputStream process))]
(loop []
(when-let [line (.readLine ^java.io.BufferedReader reader)]
(callback line)
(recur))))))
测试:
(run-proc "./dumb.sh" "" println)
About to start...
Loop iteration 1
Loop iteration 2
Loop iteration 3
Loop iteration 4
Loop iteration 5
=> nil
这个函数会阻塞,调用你的 callback
;如果你想让它在一个单独的线程中 运行,你可以在 future
中包装:
(future (callback line))
对于基于 core.async 的方法:
(defn run-proc-async
[proc-name arg-string callback]
(let [ch (async/chan 1000 (map callback))]
(async/thread
(let [pbuilder (ProcessBuilder. (into-array String [proc-name arg-string]))
process (.start pbuilder)]
(with-open [reader (clojure.java.io/reader (.getInputStream process))]
(loop []
(when-let [line (.readLine ^java.io.BufferedReader reader)]
(async/>!! ch line)
(recur))))))
ch))
这会将您的 callback
函数作为传感器应用到通道上,结果将放置在函数 returns:
(run-proc-async "./dumb.sh" "" #(let [cnt (count %)]
(println "Counted" cnt "characters")
cnt))
#object[clojure.core.async.impl.channels.ManyToManyChannel ...]
Counted 16 characters
Counted 16 characters
Counted 16 characters
Counted 16 characters
Counted 16 characters
(async/<!! *1)
=> 16
在此示例中,通道上有 1000 的缓冲区。因此,除非您开始从通道获取,否则对 >!!
的调用将在读取 1000 行后阻塞。您也可以将 put!
与回调一起使用,但这里有一个内置的 1024 限制,无论如何您都应该处理结果。
如果您不介意使用库,可以使用 lazy-gen
和 yield
from the Tupelo library 找到一个简单的解决方案。它的工作方式类似于 生成器函数 in Python:
(ns tst.demo.core
(:use demo.core tupelo.test)
(:require
[clojure.java.io :as io]
[tupelo.core :as t]
[me.raynes.conch.low-level :as cll]
))
(t/refer-tupelo)
(dotest
(let [proc (cll/proc "dumb.sh")
>> (pretty proc)
out-lines (line-seq (io/reader (grab :out proc)))
lazy-line-seq (lazy-gen
(doseq [line out-lines]
(yield line))) ]
(doseq [curr-line lazy-line-seq]
(spyx curr-line))))
使用与之前相同的 dumb.sh
,它会产生以下输出:
{:out #object[java.lang.UNIXProcess$ProcessPipeInputStream 0x465b16bb "java.lang.UNIXProcess$ProcessPipeInputStream@465b16bb"],
:in #object[java.lang.UNIXProcess$ProcessPipeOutputStream 0xfafbc63 "java.lang.UNIXProcess$ProcessPipeOutputStream@fafbc63"],
:err #object[java.lang.UNIXProcess$ProcessPipeInputStream 0x59bb8f80 "java.lang.UNIXProcess$ProcessPipeInputStream@59bb8f80"],
:process #object[java.lang.UNIXProcess 0x553c74cc "java.lang.UNIXProcess@553c74cc"]}
; one of these is printed every 2 seconds
curr-line => "Loop iteration 1"
curr-line => "Loop iteration 2"
curr-line => "Loop iteration 3"
curr-line => "Loop iteration 4"
curr-line => "Loop iteration 5"
lazy-gen
中的所有内容都是 运行 在单独的线程中 使用 core.async
。 doseq
急切地消耗进程输出并使用 yield
将其放在输出惰性序列上。第 2 个 doseq
在当前线程 中急切地消耗 lazy-gen
的结果,并在每一行都打印出来可用。
备选方案:
一个更简单的解决方案是像这样简单地使用未来:
(dotest
(let [proc (cll/proc "dumb.sh")
out-lines (line-seq (io/reader (grab :out proc))) ]
(future
(doseq [curr-line out-lines]
(spyx curr-line)))))
结果相同:
curr-line => "Loop iteration 1"
curr-line => "Loop iteration 2"
curr-line => "Loop iteration 3"
curr-line => "Loop iteration 4"
curr-line => "Loop iteration 5"