在执行之前等待另一个函数完成的 Clojure 函数

Clojure function that waits on the completion of another function before executing

我需要一个函数,当使用特定输入参数调用时执行提供的函数 g,但仅在另一个提供的函数 f 使用相同的输入参数完成执行之后。还有一个要求,当多次调用同一个输入args的函数时,f只在第一次调用时执行一次,其他调用等待完成,然后直接执行g。

编辑: 当 运行 在不同线程上并行时,该解决方案应该有效,并且还应该有效地使用线程。例如。阻塞应该基于每个输入而不是整个函数。

我第一次尝试的功能如下:

(defn dependent-func
  ([f g]
   (let [mem (atom {})]
     (fn [& args]
       (->> (get (locking mem
                   (swap! mem (fn [latch-map args]
                                (if (contains? latch-map args)
                                  latch-map
                                  (let [new-latch (CountDownLatch. 1)
                                        new-latch-map (assoc latch-map args new-latch)]
                                    (->> (Thread. #(do (apply f args)
                                                       (.countDown new-latch)))
                                         (.start))
                                new-latch-map))) args)) args)
            (.await))
       (apply g args)))))

这似乎符合我的要求,并且等待 f 是基于每个输入的,所以我对此比较满意。最初我希望只使用交换!做内存更新但不幸的是交换!明确指出交换中的功能!可以多次调用(我在测试中看到过)。因此,我最终不得不在更新时锁定内存,这真的很难看。

我确信一定有比我更好地利用 Closure 的并发机制的更简洁的方法来执行此操作,但到目前为止我一直无法找到它。

如有任何建议,我们将不胜感激。

谢谢,

马特

Clojure 的 futurepromisedeliver 组合非常适合启动一个进程并让多个线程等待它完成。

  • Future 用于在后台启动一个线程(它可以做更多,虽然在这个例子中我不需要它)

  • Promise 用于立即 return 一个对象,一旦准备就绪,该对象将包含答案。

  • Deliver 用于在准备好后提供承诺的答案。

我还将等待部分拆分到它自己的函数中,以使代码更易于理解,这样我就可以使用内置的记忆功能:

这个问题很好地说明了何时使用 promise 和 deliver 而不是简单的 future。

因为我们要在 运行 函数两次不安全的地方使用 memoize, 我们需要注意这两个调用不会在 完全 相同的位置输入 memoize 时间。所以我们只会在进入 memoize 时锁定,而不是持续时间 记忆功能。

hello.core> (def lock [])
#'hello.core/lock

此函数将始终 return 每次调用 f 时相同的未来对象 使用一组给定的参数,除了我们需要通过包装它来使 memoize 安全 在执行锁定的函数中(您也可以为此使用代理)

hello.core> (def wait-for-function-helper             
              (memoize (fn [f args]
                         (let [answer (promise)]
                           (println "waiting for function " f " with args" args)
                           (future (deliver answer (apply f args)))
                           answer))))

#'hello.core/wait-for-function-helper
hello.core> (defn wait-for-function [& args]
              (locking lock
                (apply wait-for-function-helper args)))
#'hello.core/wait-for-function

现在我们编写使用安全记忆的实际 dependent-func 函数, 未来生产,等待功能功能。

hello.core> (defn dependent-func [f g & args]
              @(wait-for-function f args)
              (apply g args))
#'hello.core/dependent-func

并定义一个慢操作以查看它的实际效果:

hello.core> (defn slow-f-1 [x]
              (println "starting slow-f-1")
              (Thread/sleep 10000)
              (println "finishing slow-f-1")
              (dec x))
#'hello.core/slow-f-1

为了测试它,我们想在 完全 同时启动两个相同的功能。

hello.core> (do (future
                  (println "first" (dependent-func slow-f-1 inc 4)))
                (future
                  (println "second" (dependent-func slow-f-1 inc 4))))

waiting for function  
#object[clojure.core$future_call$reify__6736 0x40534083 {:status :pending, :val nil}] with args (4)
#object[hello.core$slow_f_1 0x4f9b3396 hello.core$slow_f_1@4f9b3396]
starting slow-f-1
finishing slow-f-1
second
first
5
5

如果我们再次调用它,我们只会看到 slow-f-1 运行 一次:

hello.core> (do (future
                  (println "first" (dependent-func slow-f-1 inc 4)))
                (future
                  (println "second" (dependent-func slow-f-1 inc 4))))

#object[clojure.core$future_call$reify__6736 0x3935ea29 {:status :pending, :val nil}]
first 5
second 5

类似这样的答案对您的问题来说要简单得多:

(defn waiter
  [f g & args]
  (let [f-result (f args) 
        g-result (g args) ]
    (println (format "waiter: f-result=%d g-result=%d" f-result g-result))))

(defn my-f 
  [args]
  (let [result (apply + args)]
    (println "my-f running:" result)
    result))

; change your orig prob a bit, and define/use my-f-memo instead of the original my-f
(def my-f-memo (memoize my-f))

(defn my-g
  [args]
  (let [result (apply * args)]
    (println "my-g running:" result)
    result))

(waiter my-f-memo my-g 2 3 4)
(waiter my-f-memo my-g 2 3 4)

> lein run
my-f running: 9
my-g running: 24
waiter: f-result=9 g-result=24
my-g running: 24
waiter: f-result=9 g-result=24
main - enter

如果稍微更改一下问题陈述并传入第一个函数的记忆版本 f,解决方案就容易多了。

只需以 (let [...]...) 形式按顺序调用函数,即可在执行第二个函数之前强制完成第一个函数。

此外,您可以强制 waiter 函数为您执行 f 的记忆,但是手动模拟 memoize 已经完成的工作会多一些工作。


更新:原来的问题并没有明确暗示它需要在并发环境中工作。如果多个线程 一个问题,只需将 waiter 的定义更改为:

(defn waiter
  [f g & args]
  (let [f-result (locking f (f args))
        g-result (g args) ]
    (println (format "waiter: f-result=%d g-result=%d" f-result g-result))))

启动线程到 运行 f 没有什么意义,如果您接下来要做的是等待该线程完成。您也可以只在当前线程上 运行 f。在那种情况下,您的问题可以很好地分解为两个子问题:

  1. 如何记忆对 f 的调用而不像标准记忆那样冒并发执行的风险。
  2. 返回使用该记忆函数的 lambda,然后调用 g

让我们以相反的顺序解决这些问题,首先假设 (my-memoize f) 按您的需要工作,然后再编写它:

(defn dependent-func [f g]
  (let [f' (my-memoize f)]
    (fn [& args]
      (apply f' args)
      (apply g args))))

有一个称职的 memoize 很简单,对吧?现在,要实现 memoize,您可以做一些事情。您可以像以前一样使用锁定,我认为这是非常合理的,因为您明确希望阻止并发执行;一旦你放弃线程启动业务,它也很容易:

(defn my-memoize [f]
  (let [memo (atom {})]
    (fn [& args]
      (locking memo
        (if (contains? @memo args)
          (get @memo args)
          (get (swap! memo assoc args (apply f args))))))))

或者您可以自己重新发明锁定,方法是在原子中存储延迟,然后让每个调用取消引用它:

(defn my-memoize [f]
  (let [memo (atom {})]
    (fn [& args]
      (-> memo
          (swap! update-in [args]
                 (fn [v]
                   (or v (delay (apply f args)))))
          (get args)
          (deref)))))

它是可读的并且"clever",因为它在一个swap!中完成了所有事情,当我第一次弄明白这一点时我感到很得意,但后来我意识到这只是劫持Delay.deref() 中的互斥锁来完成锁定,所以老实说我认为你还不如使用 locking 来明确有一个锁。