go块中作业数量的上限?

Upper limit for number of jobs in a go block?

代码如下:

(ns typedclj.async
  (:require [clojure.core.async
             :as a
             :refer [>! <! >!! <!!
                     go chan buffer
                     close! thread
                     alts! alts!! timeout]]
            [clj-http.client :as -cc]))


(time (dorun
        (let [c (chan)]
          (doseq [i (range 10 1e4)]
            (go (>! c i))))))

我收到一个错误:

Exception in thread "async-dispatch-12" java.lang.AssertionError: Assert failed: No more than 1024 pending puts are allowed on a single channel. Consider using a windowed buffer.
(< (.size puts) impl/MAX-QUEUE-SIZE)
    at clojure.core.async.impl.channels.ManyToManyChannel.put_BANG_(channels.clj:150)
    at clojure.core.async.impl.ioc_macros$put_BANG_.invoke(ioc_macros.clj:959)
    at typedclj.async$eval11807$fn__11816$state_machine__6185__auto____11817$fn__11819.invoke(async.clj:19)
    at typedclj.async$eval11807$fn__11816$state_machine__6185__auto____11817.invoke(async.clj:19)
    at clojure.core.async.impl.ioc_macros$run_state_machine.invoke(ioc_macros.clj:940)
    at clojure.core.async.impl.ioc_macros$run_state_machine_wrapped.invoke(ioc_macros.clj:944)
    at typedclj.async$eval11807$fn__11816.invoke(async.clj:19)
    at clojure.lang.AFn.run(AFn.java:22)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)...

根据http://martintrojer.github.io/clojure/2013/07/07/coreasync-and-blocking-io/

... This will break the 1 job = 1 thread knot, thus this thread parking will allow us to scale the number of jobs way beyond any thread limit on the platform (usually around 1000 on the JVM).

core.async gives (blocking) channels and a new (unbounded) thread pool when using 'thread'. This (in effect) is just some sugar over using java threads (or clojure futures) and BlockingQueues from java.util.concurrent. The main feature is go blocks in which threads can be parked and resumed on the (potentially) blocking calls dealing with core.async's channels...

1e4个职位是不是已经太多了?那么上限是多少?

未使用的 puts 的限制是通道缓冲区的大小加上队列的大小。

core.async 中的队列大小限制为 1024,但不应依赖于此。

我平时不这么大声疾呼,所以希望你能原谅我的一次冒犯:

在一个更完美的世界里,每个程序员都会在睡觉前和醒来的第一件事上对自己重复 "there is no such thing as an unbounded queue" 五次。这种思维模式需要弄清楚如何在您的系统中处理背压,以便当过程中某处出现减速时,之前的部分有办法找出它并放慢自己的响应速度。 在 core.async 中默认背压是即时的 因为默认缓冲区大小为零。在有人准备好消费之前,go block 不会成功地将某些东西放入 chan。

chans 基本上是这样的:

"queue of pending puts" --> buffer --> "queue of pending takes"

putter 和 taker 队列旨在为通过此管道通信的两个进程留出时间来安排自己,以便取得进展。没有这些,线程就没有调度空间,就会发生死锁。它们 NOT 旨在用作 作为 缓冲区。这就是中间缓冲区的用途,这就是使它成为唯一具有明确大小的缓冲区的设计。 通过在 chan:

中设置缓冲区的大小,为 您的 系统 显式设置缓冲区大小
user> (time (dorun
        (let [c (chan 1e6)]
          (doseq [i (range 10 1e4)]
            (go (>! c i))))))
"Elapsed time: 83.526679 msecs"
nil

在这种情况下,我 "calculated" 如果有多达一百万个等待作业,我的整个系统将处于良好状态。当然,您在现实世界中的经历会有所不同,并且非常适合您的情况。

感谢您的耐心等待,