core.async 个通道上的换能器

Transducer on core.async channels

如果我有 10 个缓冲订阅频道,每个频道都有一个执行时间为 5 秒的换能器。转换器是并发执行还是全部在同一个线程上执行(假设是多线程上下文)?

它们将并行执行,因此同时在不同的真实线程上执行。但是,您的机器可能没有 10 个内核,因此会有一些停顿和阻塞。您可以在 'Parking and Blocking'.

部分中查看技术细节 here

很容易检查,记录线程,传感器正在执行:

(def log-chan (chan))

(go-loop []
  (println (<! log-chan))
  (recur))

(def channels
  (repeatedly
    10
    #(chan 10
       (map (fn [item]
              (let [thread (Thread/currentThread)]
                (go (>! log-chan thread)))
              item)))))

(doseq [c channels]
  (go (>! c :item)))

输出:

#object[java.lang.Thread 0x77a39fa0 Thread[async-dispatch-27,5,main]]
#object[java.lang.Thread 0x7d5bf4d0 Thread[async-dispatch-31,5,main]]
#object[java.lang.Thread 0x53ecb32b Thread[async-dispatch-29,5,main]]
#object[java.lang.Thread 0x2b74f3ac Thread[async-dispatch-25,5,main]]
#object[java.lang.Thread 0x6eb50f9e Thread[async-dispatch-26,5,main]]
#object[java.lang.Thread 0x30701edb Thread[async-dispatch-30,5,main]]
#object[java.lang.Thread 0x1a370b69 Thread[async-dispatch-36,5,main]]
#object[java.lang.Thread 0x3d9884a2 Thread[async-dispatch-24,5,main]]
#object[java.lang.Thread 0x208941d0 Thread[async-dispatch-23,5,main]]
#object[java.lang.Thread 0x2c77aeb Thread[async-dispatch-28,5,main]]

所以你可以看到,有不同的线程。

但它确实依赖于一个 go 块,您可以在其中将数据发送到您的频道,因此如果您更改此设置:

(doseq [c channels]
  (go (>! c :item)))

对此:

(go (doseq [c channels]
  (>! c :item)))

你得到这个输出:

#object[java.lang.Thread 0x16856295 Thread[async-dispatch-6,5,main]]
#object[java.lang.Thread 0x16856295 Thread[async-dispatch-6,5,main]]
#object[java.lang.Thread 0x16856295 Thread[async-dispatch-6,5,main]]
#object[java.lang.Thread 0x16856295 Thread[async-dispatch-6,5,main]]
#object[java.lang.Thread 0x16856295 Thread[async-dispatch-6,5,main]]
#object[java.lang.Thread 0x16856295 Thread[async-dispatch-6,5,main]]
#object[java.lang.Thread 0x16856295 Thread[async-dispatch-6,5,main]]
#object[java.lang.Thread 0x16856295 Thread[async-dispatch-6,5,main]]
#object[java.lang.Thread 0x16856295 Thread[async-dispatch-6,5,main]]
#object[java.lang.Thread 0x16856295 Thread[async-dispatch-6,5,main]]

因此所有通道的换能器都将使用相同的线程