转换器如何在 core.async 个通道中执行?
How are transducers executed in core.async channels?
将频道设为这样的频道时:
(chan 10 tx)
如果我像这样创建了 10 个通道,然后同时向所有通道发送一条消息,转换器将如何执行。他们会 运行 并发还是在一个线程上?
我认为现在换能器为 运行 时的行为未定义,但查看 ManyToManyChannel 的实现,换能器(即 add!
字段) 在通道写入和读取时都可以调用。
运行一个简单的测试好像是如果channel没满,写线程会执行transducer,但是如果channel满了,有时读线程运行s it.
具有小缓冲区的示例:
(defn thread-name []
(.getName (Thread/currentThread)))
(require '[clojure.core.async :as async :refer [chan <! >! >!! go]])
(defn p [& args]
(locking *out*
(apply println (thread-name) ":" args)))
(defn log [v]
(p "Transforming" v)
v)
(def tx (map log))
(def c (chan 1 tx))
(def c2 (chan 1 tx))
(go
(loop []
(when-let [v (<! c)]
(p "Getting from c1" v)
(<! (async/timeout 100))
(recur))))
(go
(loop []
(when-let [v (<! c2)]
(p "Getting from c2" v)
(<! (async/timeout 100))
(recur))))
(dotimes [_ 5]
(p "Putting in c1" 1)
(>!! c 1)
(p "Putting in c2" 100)
(>!! c2 100))
产生输出:
nREPL-worker-20 : Transforming 1
nREPL-worker-20 : Putting in c2 100
async-dispatch-33 : Getting from c1 1
nREPL-worker-20 : Transforming 100
nREPL-worker-20 : Putting in c1 1
async-dispatch-31 : Getting from c2 100
nREPL-worker-20 : Transforming 1
nREPL-worker-20 : Putting in c2 100
nREPL-worker-20 : Transforming 100
nREPL-worker-20 : Putting in c1 1
async-dispatch-35 : Getting from c2 100
async-dispatch-34 : Transforming 1 <---- In this case is run in the reading side
async-dispatch-34 : Getting from c1 1
nREPL-worker-20 : Putting in c2 100
nREPL-worker-20 : Transforming 100
async-dispatch-37 : Getting from c2 100
async-dispatch-36 : Getting from c1 1
nREPL-worker-20 : Putting in c1 1
将频道设为这样的频道时:
(chan 10 tx)
如果我像这样创建了 10 个通道,然后同时向所有通道发送一条消息,转换器将如何执行。他们会 运行 并发还是在一个线程上?
我认为现在换能器为 运行 时的行为未定义,但查看 ManyToManyChannel 的实现,换能器(即 add!
字段) 在通道写入和读取时都可以调用。
运行一个简单的测试好像是如果channel没满,写线程会执行transducer,但是如果channel满了,有时读线程运行s it.
具有小缓冲区的示例:
(defn thread-name []
(.getName (Thread/currentThread)))
(require '[clojure.core.async :as async :refer [chan <! >! >!! go]])
(defn p [& args]
(locking *out*
(apply println (thread-name) ":" args)))
(defn log [v]
(p "Transforming" v)
v)
(def tx (map log))
(def c (chan 1 tx))
(def c2 (chan 1 tx))
(go
(loop []
(when-let [v (<! c)]
(p "Getting from c1" v)
(<! (async/timeout 100))
(recur))))
(go
(loop []
(when-let [v (<! c2)]
(p "Getting from c2" v)
(<! (async/timeout 100))
(recur))))
(dotimes [_ 5]
(p "Putting in c1" 1)
(>!! c 1)
(p "Putting in c2" 100)
(>!! c2 100))
产生输出:
nREPL-worker-20 : Transforming 1
nREPL-worker-20 : Putting in c2 100
async-dispatch-33 : Getting from c1 1
nREPL-worker-20 : Transforming 100
nREPL-worker-20 : Putting in c1 1
async-dispatch-31 : Getting from c2 100
nREPL-worker-20 : Transforming 1
nREPL-worker-20 : Putting in c2 100
nREPL-worker-20 : Transforming 100
nREPL-worker-20 : Putting in c1 1
async-dispatch-35 : Getting from c2 100
async-dispatch-34 : Transforming 1 <---- In this case is run in the reading side
async-dispatch-34 : Getting from c1 1
nREPL-worker-20 : Putting in c2 100
nREPL-worker-20 : Transforming 100
async-dispatch-37 : Getting from c2 100
async-dispatch-36 : Getting from c1 1
nREPL-worker-20 : Putting in c1 1