为什么在 Clojure 中更改 ref 时会调用两次通勤函数?

Why is the commute function called twice when changing a ref in Clojure?

我想我理解 Clojure 事务中 commutealter 思想的基本区别。

alter本质上是'locks'事务从开始到结束的标识,所以多个事务必须顺序执行。

commute 仅将锁应用于身份值的实际更改,因此交易中的其他操作可能 运行 在不同的时间和不同的世界观。

但我对某事感到困惑。让我们定义一个有副作用的函数和一个作用于其的引用:

(defn fn-with-side-effects [state]
    (println "Hello!")
    (inc state))

(def test-counter (ref 0))

现在如果我们使用 alter,我们会看到预期的行为:

user=> (dosync (alter test-counter fn-with-side-effects))
Hello!
1

但是如果我们使用通勤:

user=> (dosync (ref-set test-counter 0))
0
user=> (dosync (commute test-counter fn-with-side-effects))
Hello!
Hello! ; hello is printed twice!
1

所以在 commute 版本中,函数显然只修改了 ref 一次,因为最终值为 1。但是修饰函数的副作用被执行了两次。为什么会这样?

我想通了。

发生这种情况是因为通勤功能总是执行两次

Commute 比 alter 允许更多的潜在并发,因为它不会在整个交易期间锁定身份。

相反,它会在交易开始时读取身份的值一次,并在调用通勤操作时读取它 returns 通勤功能应用于此值。

这个值现在完全有可能已经过时,因为在事务开始和通勤功能执行之间的某个时间,其他线程可能已经更改了它。

但是,完整性得以保持,因为通勤功能在实际修改引用时在提交时再次执行。

这个网站对区别有很清楚的解释:http://squirrel.pl/blog/2010/07/13/clojure-alter-vs-commute/

In fact, when commute is called it instantly returns result of running the function on the ref. At the very end of transaction it performs the calculation again, this time synchronously (like alter) updating the ref. That’s why eventually the counter value is 51 even though the last thread printed 45.

所以如果你的通勤函数有副作用要小心,因为它们会被执行两次!!

为了了解 commute 的工作原理,我做了一些实验。我想把我的解释分成三个部分:

  • 比较和设置语义
  • alter
  • commute

比较和设置语义

我觉得Clojure for the Brave and True已经解释的很好了:

swap! implements "compare-and-set" semantics, meaning it does the following internally:

  1. It reads the current state of the atom
  2. It then applies the update function to that state
  3. Next, it checks whether the value it read in step 1 is identical to the atom's current value
  4. If it is, then swap! updates the atom to refer to the result of step 2
  5. If it isn't, then swap! retries, going through the process again with step 1.

swap! 用于 atom,但知道它会帮助我们理解 altercommute,因为它们使用类似的方法更新 ref.

atom 不同,ref 修改(通过 altercommuteref-set)必须包含在事务中。当事务开始(或重试)时,它将捕获所有包含 ref 的快照(因为 alter 需要它)。 ref只有事务提交后才会修改。

alter

在一个交易中,所有alter将被修改的ref组成一个组。如果组内ref中任意一个修改失败,则重试交易。基本上 alter 执行以下操作:

  1. 将其更改 ref 与事务捕获的快照进行比较。如果它们看起来不同,请重试交易;否则
  2. 使用提供的函数从快照创建一个新状态
  3. 再次与快照进行比较ref。如果它们看起来不同,请重试交易;否则
  4. 尝试对 ref 进行写锁定,在此交易试用结束之前不要让任何人修改它。如果失败(ref 已经被锁定),等待一段时间(例如 100 毫秒),然后重试事务。
  5. 告诉事务在执行委托时将此 ref 更新为新状态。

让我们演示一下平滑的改变。首先,我们将创建一个线程 t1alter 3 个计数器 c1c2c3 以及 slow-inc

(ns testing.core)

(def start (atom 0)) ; Record start time.

(def c1 (ref 0)) ; Counter 1
(def c2 (ref 0)) ; Counter 2
(def c3 (ref 0)) ; Counter 3

(defn milliTime 
  "Get current time in millisecond."
  []
  (int (/ (System/nanoTime) 1000000)))

(defn lap 
  "Get elapse time since 'start' in millisecond."
  []
  (- (milliTime) @start))

(defn slow-inc
  "Slow increment, takes 1 second."
  [x x-name]
  (println "slow-inc beg" x-name ":" x "|" (lap) "ms")
  (Thread/sleep 1000)
  (println "slow-inc end" x-name ":" (inc x) "|" (lap) "ms")
  (inc x))

(defn fast-inc
  "Fast increment. The value it prints is incremented."
  [x x-name]
  (println "fast-inc    " x-name ":" (inc x) "|" (lap) "ms")
  (inc x))

(defn -main
  []
  ;; Initialize c1, c2, c3 and start.
  (dosync (ref-set c1 0) 
          (ref-set c2 0)
          (ref-set c3 0))
  (reset! start (milliTime))

  ;; Start two new threads simultaneously.
  (let [t1 (future
             (dosync
               (println "transaction start   |" (lap) "ms")
               (alter c1 slow-inc "c1")
               (alter c2 slow-inc "c2")
               (alter c3 slow-inc "c3")
               (println "transaction end     |" (lap) "ms")))
        t2 (future)]

    ;; Dereference all of them (wait until all 2 threads finish).
    @t1 @t2 

    ;; Print final counters' values.
    (println "c1 :" @c1)
    (println "c2 :" @c2)
    (println "c3 :" @c3)))

我们得到了这个:

transaction start   | 3 ms    ; 1st try
slow-inc beg c1 : 0 | 8 ms
slow-inc end c1 : 1 | 1008 ms
slow-inc beg c2 : 0 | 1009 ms
slow-inc end c2 : 1 | 2010 ms
slow-inc beg c3 : 0 | 2010 ms
slow-inc end c3 : 1 | 3011 ms
transaction end     | 3012 ms
c1 : 1
c2 : 1
c3 : 1

过程顺利。没有惊喜。

让我们看看如果ref(比方说,c3)在它的改变((alter c3 ...))之前被修改会发生什么。我们将在更改 c1 期间对其进行修改。将lett2的绑定编辑为:

t2 (future
     (Thread/sleep 900) ; Increment at 900 ms
     (dosync (alter c3 fast-inc "c3")))

结果:

transaction start   | 2 ms    ; 1st try
slow-inc beg c1 : 0 | 7 ms
fast-inc     c3 : 1 | 904 ms  ; c3 being modified in thread t2
slow-inc end c1 : 1 | 1008 ms
slow-inc beg c2 : 0 | 1009 ms
slow-inc end c2 : 1 | 2010 ms
transaction start   | 2011 ms ; 2nd try
slow-inc beg c1 : 0 | 2011 ms
slow-inc end c1 : 1 | 3012 ms
slow-inc beg c2 : 0 | 3013 ms
slow-inc end c2 : 1 | 4014 ms
slow-inc beg c3 : 1 | 4015 ms
slow-inc end c3 : 2 | 5016 ms
transaction end     | 5016 ms
c1 : 1
c2 : 1
c3 : 2

如您所见,在 1st-try-(alter c3 ...) 的第 1 步中,它实现 c3 (val = 1) 看起来与事务捕获的快照 (val = 0) 不同,所以它重试交易。

现在,如果 ref(比方说,c1)在期间被修改((alter c1 ...))怎么办?我们将在线程 t2 上修改 c1。将lett2的绑定编辑为:

t2 (future
     (Thread/sleep 900) ; Increment at 900 ms
     (dosync (alter c1 fast-inc "c1")))

结果:

transaction start   | 3 ms    ; 1st try
slow-inc beg c1 : 0 | 8 ms
fast-inc     c1 : 1 | 904 ms  ; c1 being modified in thread t2
slow-inc end c1 : 1 | 1008 ms
transaction start   | 1009 ms ; 2nd try
slow-inc beg c1 : 1 | 1009 ms
slow-inc end c1 : 2 | 2010 ms
slow-inc beg c2 : 0 | 2011 ms
slow-inc end c2 : 1 | 3011 ms
slow-inc beg c3 : 0 | 3012 ms
slow-inc end c3 : 1 | 4013 ms
transaction end     | 4014 ms
c1 : 2
c2 : 1
c3 : 1

这次在1st-try-(alter c1 ...)的第3步,发现ref被修改了,所以请求事务重试。

现在,让我们尝试修改 ref(比方说,c1 它的改变((alter c1 ...))之后。我们将在更改 c2.

期间对其进行修改
t2 (future
     (Thread/sleep 1600) ; Increment at 1600 ms
     (dosync (alter c1 fast-inc "c1")))

结果:

transaction start   | 3 ms    ; 1st try
slow-inc beg c1 : 0 | 8 ms
slow-inc end c1 : 1 | 1009 ms
slow-inc beg c2 : 0 | 1010 ms
fast-inc     c1 : 1 | 1604 ms ; try to modify c1 in thread t2, but failed
fast-inc     c1 : 1 | 1705 ms ; keep trying...
fast-inc     c1 : 1 | 1806 ms
fast-inc     c1 : 1 | 1908 ms
fast-inc     c1 : 1 | 2009 ms
slow-inc end c2 : 1 | 2011 ms
slow-inc beg c3 : 0 | 2012 ms
fast-inc     c1 : 1 | 2110 ms ; still trying...
fast-inc     c1 : 1 | 2211 ms
fast-inc     c1 : 1 | 2312 ms
fast-inc     c1 : 1 | 2413 ms
fast-inc     c1 : 1 | 2514 ms
fast-inc     c1 : 1 | 2615 ms
fast-inc     c1 : 1 | 2716 ms
fast-inc     c1 : 1 | 2817 ms
fast-inc     c1 : 1 | 2918 ms ; and trying....
slow-inc end c3 : 1 | 3012 ms
transaction end     | 3013 ms ; 1st try ended, transaction committed.
fast-inc     c1 : 2 | 3014 ms ; finally c1 modified successfully
c1 : 2
c2 : 1
c3 : 1

由于1st-try-(alter c1 ...)已经锁定c1(第4步),所以没有人可以修改c1,直到本轮交易试用结束。

alter 就这些了。

那么,如果我们不希望 c1c2c3 全部组合在一起怎么办?假设我想在 c1c3 更改失败(在事务期间被其他线程修改)时重试事务 only。我不关心 c2 的状态。如果在交易过程中修改了c2,就不需要重试交易,这样可以节省一些时间。我们如何做到这一点?是的,通过 commute.

commute

基本上,commute 执行以下操作:

  1. 运行 函数提供 直接使用 ref(不是来自快照)但不要对结果做任何事情。
  2. 要求事务在事务提交前用相同的参数调用real-commute。 (real-commute 只是我杜撰的名字。)

我实际上不知道为什么 commute 必须 运行 步骤 1。在我看来,仅步骤 2 就足够了。 real-commute 执行以下操作:

  1. 读写锁定 ref 直到此事务试用结束(如果尚未锁定),否则重试事务。
  2. 使用给定函数从 ref 创建一个新状态
  3. 告诉事务在执行委托时将此 ref 更新为新状态。

让我们检查一下。将 let 的绑定编辑为:

t1 (future
     (dosync
       (println "transaction start   |" (lap) "ms")
       (alter c1 slow-inc "c1")
       (commute c2 slow-inc "c2") ; changed to commute
       (alter c3 slow-inc "c3")
       (println "transaction end     |" (lap) "ms")))
t2 (future)

结果:

transaction start   | 3 ms
slow-inc beg c1 : 0 | 7 ms    ; called by alter
slow-inc end c1 : 1 | 1008 ms
slow-inc beg c2 : 0 | 1009 ms ; called by commute
slow-inc end c2 : 1 | 2009 ms
slow-inc beg c3 : 0 | 2010 ms ; called by alter
slow-inc end c3 : 1 | 3011 ms
transaction end     | 3012 ms
slow-inc beg c2 : 0 | 3012 ms ; called by real-commute
slow-inc end c2 : 1 | 4012 ms
c1 : 1
c2 : 1
c3 : 1

因此,如果您使用 commuteslow-inc 会在事务提交之前被调用两次,一次被 commute 调用,一次被 real-commute 调用。第一个 commute 没有对 slow-inc 的结果做任何事情。

slow-inc 可以调用两次以上。例如,让我们尝试在线程 t2:

上修改 c3
t2 (future
     (Thread/sleep 500) ; modify c3 at 500 ms
     (dosync (alter c3 fast-inc "c3")))

结果:

transaction start   | 2 ms
slow-inc beg c1 : 0 | 8 ms
fast-inc     c3 : 1 | 504 ms  ; c3 modified at thread t2
slow-inc end c1 : 1 | 1008 ms
slow-inc beg c2 : 0 | 1009 ms ; 1st time
slow-inc end c2 : 1 | 2010 ms
transaction start   | 2012 ms
slow-inc beg c1 : 0 | 2012 ms
slow-inc end c1 : 1 | 3013 ms
slow-inc beg c2 : 0 | 3014 ms ; 2nd time
slow-inc end c2 : 1 | 4015 ms
slow-inc beg c3 : 1 | 4016 ms
slow-inc end c3 : 2 | 5016 ms
transaction end     | 5017 ms
slow-inc beg c2 : 0 | 5017 ms ; 3rd time
slow-inc end c2 : 1 | 6018 ms
c1 : 1
c2 : 1
c3 : 2

事务第一次尝试,在(commute c2 ...)求值后,(alter c3 ...)发现c3与快照不一样,从而触发事务重试。如果(alter c3 ...)(commute c2 ...)之前,那么会在评估或(commute c2 ..)之前触发重试。所以,将所有 commute 放在 alter 之后 可能会节省你一些时间。

让我们看看如果我们在线程 t2 中修改 c2,同时正在评估 t1 中的事务会发生什么。

t2 (future
     (Thread/sleep 500) ; before evaluation of (commute c2 ...)
     (dosync (alter c2 fast-inc "c2"))
     (Thread/sleep 1000) ; during evaluation of (commute c2 ...)
     (dosync (alter c2 fast-inc "c2"))
     (Thread/sleep 1000) ; after evaluation of (commute c2 ...)
     (dosync (alter c2 fast-inc "c2")))

结果:

transaction start   | 3 ms
slow-inc beg c1 : 0 | 9 ms
fast-inc     c2 : 1 | 504 ms  ; before
slow-inc end c1 : 1 | 1009 ms
slow-inc beg c2 : 1 | 1010 ms
fast-inc     c2 : 2 | 1506 ms ; during
slow-inc end c2 : 2 | 2011 ms
slow-inc beg c3 : 0 | 2012 ms
fast-inc     c2 : 3 | 2508 ms ; after
slow-inc end c3 : 1 | 3013 ms
transaction end     | 3013 ms
slow-inc beg c2 : 3 | 3014 ms
slow-inc end c2 : 4 | 4014 ms
c1 : 1
c2 : 4
c3 : 1

如您所见,没有交易重试,并且c2仍然更新为我们的预期值(4),感谢real-commute

现在我想演示一下real-commute中第1步的效果:它的ref是读写锁定的。首先,确认它是读锁的:

t2 (future
     (Thread/sleep 3500) ; during real-commute
     (println "try to read c2:" @c2 "  |" (lap) "ms"))

结果:

transaction start   | 3 ms
slow-inc beg c1 : 0 | 9 ms
slow-inc end c1 : 1 | 1010 ms
slow-inc beg c2 : 0 | 1010 ms
slow-inc end c2 : 1 | 2011 ms
slow-inc beg c3 : 0 | 2012 ms
slow-inc end c3 : 1 | 3012 ms
transaction end     | 3013 ms
slow-inc beg c2 : 0 | 3013 ms
slow-inc end c2 : 1 | 4014 ms
try to read c2: 1   | 4015 ms ; got printed after transaction trial ended
c1 : 1
c2 : 1
c3 : 1

@c2 被封锁,直到 c2 解锁。这就是为什么 println 在 4000 毫秒后得到评估,即使我们的命令是休眠 3500 毫秒。

由于 commutealter 需要读取他们的 ref 来执行给定的功能,他们将被阻止,直到他们的 ref 也被解锁。您可以尝试将 (println ...) 替换为 (alter c2 fast-inc "c2")。效果应该和这个例子一样。

所以,为了确认它是写锁定的,我们可以使用ref-set:

t2 (future
     (Thread/sleep 3500) ; during real-commute
     (dosync (ref-set c2 (fast-inc 9 " 8"))))

结果:

transaction start   | 3 ms
slow-inc beg c1 : 0 | 8 ms
slow-inc end c1 : 1 | 1008 ms
slow-inc beg c2 : 0 | 1010 ms
slow-inc end c2 : 1 | 2011 ms
slow-inc beg c3 : 0 | 2012 ms
slow-inc end c3 : 1 | 3013 ms
transaction end     | 3014 ms
slow-inc beg c2 : 0 | 3014 ms
fast-inc      8 : 9 | 3504 ms ; try to ref-set but failed
fast-inc      8 : 9 | 3605 ms ; try again...
fast-inc      8 : 9 | 3706 ms
fast-inc      8 : 9 | 3807 ms
fast-inc      8 : 9 | 3908 ms
fast-inc      8 : 9 | 4009 ms
slow-inc end c2 : 1 | 4015 ms
fast-inc      8 : 9 | 4016 ms ; finally success, c2 ref-set to 9
c1 : 1
c2 : 9
c3 : 1

从这里您还可以猜出 ref-set 的作用:

  • 如果其ref已被写锁定,则在一段时间后(例如100 ms)重试事务;否则告诉交易在执行佣金时将此 ref 更新为给定值。

real-commute 也会失败,当它的 ref 在第 1 步被锁定时。与 alterref-set 不同,它不会等待一段时间重试交易。如果 ref 被锁定的时间过长,这可能会导致问题。例如,我们将尝试修改 c1 后,使用 commute:

t2 (future
     (Thread/sleep 2500) ; during alteration of c3
     (dosync (commute c1 fast-inc "c1")))

结果:

transaction start   | 3 ms
slow-inc beg c1 : 0 | 8 ms
slow-inc end c1 : 1 | 1008 ms
slow-inc beg c2 : 0 | 1010 ms
slow-inc end c2 : 1 | 2011 ms
slow-inc beg c3 : 0 | 2012 ms
fast-inc     c1 : 1 | 2506 ms
fast-inc     c1 : 1 | 2506 ms
fast-inc     c1 : 1 | 2506 ms
...

Exception in thread "main" java.util.concurrent.ExecutionException:
  java.lang.RuntimeException: Transaction failed after reaching retry
  limit, compiling: ...

回想一下 c1 在更改后被 alter 写锁定,因此 real-commute 不断失败并不断重试事务。没有缓冲时间,达到交易重试上限,暴涨。

备注

commute 通过让用户减少 ref 来帮助提高并发性,这将导致事务重试,调用给定函数至少两次以更新其 ref 的成本。在某些情况下 commute 可能比 alter 慢。例如,当事务中唯一要做的事情是更新 refcommute 的成本高于 alter:

(def c (ref 0)) ; counter

(defn slow-inc
  [x]
  (Thread/sleep 1000)
  (inc x))

(defn add-2
  "Create two threads to slow-inc c simultaneously with func.
  func can be alter or commute."
  [func]
  (let [t1 (future (dosync (func c slow-inc)))
        t2 (future (dosync (func c slow-inc)))]
    @t1 @t2))

(defn -main
  [& args]
  (dosync (ref-set c 0))
  (time (add-2 alter))
  (dosync (ref-set c 0))
  (time (add-2 commute)))

结果:

"Elapsed time: 2003.239891 msecs" ; alter
"Elapsed time: 4001.073448 msecs" ; commute

以下是alter的程序:

  • 0 毫秒:t1alter 开始。
  • 1 毫秒:t2alter 开始。
  • 1000 毫秒:t1alter 成功,t1 提交,c 变为 1.
  • 1001 毫秒:t2alter 发现 c 与其快照(步骤 2)不同,重试事务。
  • 2001 毫秒:t2alter 成功,t2 提交,c 变为 2.

commute的程序:

  • 0 毫秒:t1commute 开始。
  • 1 毫秒:t2commute 开始。
  • 1000 毫秒:t1real-commute 已启动。 c 已锁定。
  • 1001 毫秒:t2real-commute 已启动。它发现 c 已被锁定,因此它重试事务(步骤 1)。
  • 1002 ms: t2commute启动了,但是c被锁定了,所以被阻塞了。
  • 2000 毫秒:t1real-commute 结束,事务已提交。 c变成了1。t2解封了。
  • 3002 毫秒:t2real-commute 开始。
  • 4002 毫秒:t2real-commute 结束,事务已提交。 c 变成了 2.

这就是本例中 commutealter 慢的原因。

这看起来可能与 clojuredocs.org 中的 example of commute 相矛盾。关键区别在于,在他的示例中,延迟(100 毫秒)发生在事务主体中,但在我的示例中延迟发生在 slow-inc 中。这种差异导致他的real-commute阶段运行非常快,从而减少锁定时间和阻塞时间。更少的锁定时间意味着更少的重试概率。这就是为什么在他的示例中,commutealter 快。将他的 inc 更改为 slow-inc,您将得到与我相同的观察结果。

就这些了。