clojure ref 的奇怪行为

Strange behavior of clojure ref

我有 100 名工人(代理人)共享一个包含任务集合的 ref。虽然这个集合有任务,但每个工作人员从这个集合中获得一个任务(在 dosync 块中),打印它并有时将它放回集合中(在 dosync 块中):

(defn have-tasks?
  [tasks]
  (not (empty? @tasks)))

(defn get-task
  [tasks]
  (dosync
    (let [task (first @tasks)]
      (alter tasks rest)
      task)))

(defn put-task
  [tasks task]
  (dosync (alter tasks conj task))
  nil)

(defn worker
  [& {:keys [tasks]}]
  (agent {:tasks tasks}))

(defn worker-loop
  [{:keys [tasks] :as state}]
  (while (have-tasks? tasks)
    (let [task (get-task tasks)]
      (println "Task: " task)
      (when (< (rand) 0.1)
        (put-task tasks task))))
  state)

(defn create-workers
  [count & options]
  (->> (range 0 count)
       (map (fn [_] (apply worker options)))
       (into [])))

(defn start-workers
  [workers]
  (doseq [worker workers] (send-off worker worker-loop)))

(def tasks (ref (range 1 10000000)))

(def workers (create-workers 100 :tasks tasks))

(start-workers workers)
(apply await workers)

当我运行这段代码时,代理打印的最后一个值是(经过多次尝试): 435445, 4556294, 1322061, 3950017。 但从来没有 9999999 我期望的那样。 每次集合最后都是空的。 我做错了什么?

编辑:

我尽可能简单地重写了工作循环:

(defn worker-loop
  [{:keys [tasks] :as state}]
  (loop []
    (when-let [task (get-task tasks)]
      (println "Task: " task)
      (recur)))
  state)

但是问题依然存在。 当只创建一个工人时,此代码的行为符合预期。

当达到范围内的最后一个号码时,工作人员还持有一个更旧的号码。其中一些将return编入队列,再次处理。

为了更好地查看发生了什么,您可以更改worker-loop打印每个工人处理的最后一个任务:

(defn worker-loop
  [{:keys [tasks] :as state}]
  (loop [last-task nil]
    (if (have-tasks? tasks)
      (let [task (get-task tasks)]
        ;; (when (< (rand) 0.1)
        ;;   (put-task tasks task)
        (recur task))
      (when last-task
        (println "Last task:" last-task))))
  state)

这也显示了代码中的竞争条件,其中 have-tasks? 看到的任务通常在任务处理接近尾声时调用 get-task 时被其他人占用。

竞争条件可以通过删除 have-tasks? 来解决,而是使用 get-task 中的 return 值 nil 作为没有更多任务可用的信号(目前) .

更新:

据观察,这种竞争条件并不能解释问题。

通过删除 get-task 中可能的竞争条件也无法解决问题,如下所示:

(defn get-task [tasks]
  (dosync
   (first (alter tasks rest))))

但是将 get-task 更改为使用显式锁似乎可以解决问题:

 (defn get-task [tasks]  
   (locking :lock
     (dosync
       (let [task (first @tasks)]
         (alter tasks rest)
         task))))

我问了这个question on the Clojure Google Group,它帮助我找到了答案。

问题是我在 STM 事务中使用了惰性序列。

当我替换这段代码时:

(def tasks (ref (range 1 10000000)))

通过这个:

(def tasks (ref (into [] (range 1 10000000))))

它按预期工作!

在我出现问题的生产代码中,我使用了 Korma 框架,它也是 returns 元组的惰性集合,如我的示例所示。

结论:避免在STM事务中使用惰性数据结构。

这里的问题与代理无关,与懒惰几乎没有任何关系。这是原始代码的简化版本,但仍然存在问题:

(defn f [init]
  (let [state (ref init)
        task (fn []
               (loop [last-n nil]
                 (if-let [n (dosync
                              (let [n (first @state)]
                                (alter state rest)
                                n))]
                   (recur n)
                   (locking :out
                     (println "Last seen:" last-n)))))
        workers (->> (range 0 5)
                     (mapv (fn [_] (Thread. task))))]
    (doseq [w workers] (.start w))
    (doseq [w workers] (.join w))))

(defn r []
  (f (range 1 100000)))

(defn i [] (f (->> (iterate inc 1)
                   (take 100000))))

(defn t []
  (f (->> (range 1 100000)
          (take Integer/MAX_VALUE))))

运行 这段代码表明 it 都是懒惰的,可靠地工作,而 r 可靠地不工作。问题实际上是 range 调用返回的 class 中的并发错误。事实上,该错误已记录在 this Clojure ticket 中,并已从 Clojure 版本 1.9.0-alpha11.

开始修复

错误的快速总结,以防由于某种原因无法访问票证:在 rest 调用 range 结果的内部,有一个很小的机会竞争条件:“flag" that says "the next value has already been computed" was set before the actual value itself, which meant that a second thread could see that flag as true even though the "next value" is still nil. The call to alter would then fix that nil value on the ref. It's been fixed by swapping the two assignment lines.

如果 range 的结果在单个线程中强制实现或包装在另一个惰性序列中,则不会出现该错误。