core.async 个通道上的换能器
Transducer on core.async channels
如果我有 10 个缓冲订阅频道,每个频道都有一个执行时间为 5 秒的换能器。转换器是并发执行还是全部在同一个线程上执行(假设是多线程上下文)?
它们将并行执行,因此同时在不同的真实线程上执行。但是,您的机器可能没有 10 个内核,因此会有一些停顿和阻塞。您可以在 'Parking and Blocking'.
部分中查看技术细节 here
很容易检查,记录线程,传感器正在执行:
(def log-chan (chan))
(go-loop []
(println (<! log-chan))
(recur))
(def channels
(repeatedly
10
#(chan 10
(map (fn [item]
(let [thread (Thread/currentThread)]
(go (>! log-chan thread)))
item)))))
(doseq [c channels]
(go (>! c :item)))
输出:
#object[java.lang.Thread 0x77a39fa0 Thread[async-dispatch-27,5,main]]
#object[java.lang.Thread 0x7d5bf4d0 Thread[async-dispatch-31,5,main]]
#object[java.lang.Thread 0x53ecb32b Thread[async-dispatch-29,5,main]]
#object[java.lang.Thread 0x2b74f3ac Thread[async-dispatch-25,5,main]]
#object[java.lang.Thread 0x6eb50f9e Thread[async-dispatch-26,5,main]]
#object[java.lang.Thread 0x30701edb Thread[async-dispatch-30,5,main]]
#object[java.lang.Thread 0x1a370b69 Thread[async-dispatch-36,5,main]]
#object[java.lang.Thread 0x3d9884a2 Thread[async-dispatch-24,5,main]]
#object[java.lang.Thread 0x208941d0 Thread[async-dispatch-23,5,main]]
#object[java.lang.Thread 0x2c77aeb Thread[async-dispatch-28,5,main]]
所以你可以看到,有不同的线程。
但它确实依赖于一个 go
块,您可以在其中将数据发送到您的频道,因此如果您更改此设置:
(doseq [c channels]
(go (>! c :item)))
对此:
(go (doseq [c channels]
(>! c :item)))
你得到这个输出:
#object[java.lang.Thread 0x16856295 Thread[async-dispatch-6,5,main]]
#object[java.lang.Thread 0x16856295 Thread[async-dispatch-6,5,main]]
#object[java.lang.Thread 0x16856295 Thread[async-dispatch-6,5,main]]
#object[java.lang.Thread 0x16856295 Thread[async-dispatch-6,5,main]]
#object[java.lang.Thread 0x16856295 Thread[async-dispatch-6,5,main]]
#object[java.lang.Thread 0x16856295 Thread[async-dispatch-6,5,main]]
#object[java.lang.Thread 0x16856295 Thread[async-dispatch-6,5,main]]
#object[java.lang.Thread 0x16856295 Thread[async-dispatch-6,5,main]]
#object[java.lang.Thread 0x16856295 Thread[async-dispatch-6,5,main]]
#object[java.lang.Thread 0x16856295 Thread[async-dispatch-6,5,main]]
因此所有通道的换能器都将使用相同的线程
如果我有 10 个缓冲订阅频道,每个频道都有一个执行时间为 5 秒的换能器。转换器是并发执行还是全部在同一个线程上执行(假设是多线程上下文)?
它们将并行执行,因此同时在不同的真实线程上执行。但是,您的机器可能没有 10 个内核,因此会有一些停顿和阻塞。您可以在 'Parking and Blocking'.
部分中查看技术细节 here很容易检查,记录线程,传感器正在执行:
(def log-chan (chan))
(go-loop []
(println (<! log-chan))
(recur))
(def channels
(repeatedly
10
#(chan 10
(map (fn [item]
(let [thread (Thread/currentThread)]
(go (>! log-chan thread)))
item)))))
(doseq [c channels]
(go (>! c :item)))
输出:
#object[java.lang.Thread 0x77a39fa0 Thread[async-dispatch-27,5,main]]
#object[java.lang.Thread 0x7d5bf4d0 Thread[async-dispatch-31,5,main]]
#object[java.lang.Thread 0x53ecb32b Thread[async-dispatch-29,5,main]]
#object[java.lang.Thread 0x2b74f3ac Thread[async-dispatch-25,5,main]]
#object[java.lang.Thread 0x6eb50f9e Thread[async-dispatch-26,5,main]]
#object[java.lang.Thread 0x30701edb Thread[async-dispatch-30,5,main]]
#object[java.lang.Thread 0x1a370b69 Thread[async-dispatch-36,5,main]]
#object[java.lang.Thread 0x3d9884a2 Thread[async-dispatch-24,5,main]]
#object[java.lang.Thread 0x208941d0 Thread[async-dispatch-23,5,main]]
#object[java.lang.Thread 0x2c77aeb Thread[async-dispatch-28,5,main]]
所以你可以看到,有不同的线程。
但它确实依赖于一个 go
块,您可以在其中将数据发送到您的频道,因此如果您更改此设置:
(doseq [c channels]
(go (>! c :item)))
对此:
(go (doseq [c channels]
(>! c :item)))
你得到这个输出:
#object[java.lang.Thread 0x16856295 Thread[async-dispatch-6,5,main]]
#object[java.lang.Thread 0x16856295 Thread[async-dispatch-6,5,main]]
#object[java.lang.Thread 0x16856295 Thread[async-dispatch-6,5,main]]
#object[java.lang.Thread 0x16856295 Thread[async-dispatch-6,5,main]]
#object[java.lang.Thread 0x16856295 Thread[async-dispatch-6,5,main]]
#object[java.lang.Thread 0x16856295 Thread[async-dispatch-6,5,main]]
#object[java.lang.Thread 0x16856295 Thread[async-dispatch-6,5,main]]
#object[java.lang.Thread 0x16856295 Thread[async-dispatch-6,5,main]]
#object[java.lang.Thread 0x16856295 Thread[async-dispatch-6,5,main]]
#object[java.lang.Thread 0x16856295 Thread[async-dispatch-6,5,main]]
因此所有通道的换能器都将使用相同的线程