在执行之前等待另一个函数完成的 Clojure 函数
Clojure function that waits on the completion of another function before executing
我需要一个函数,当使用特定输入参数调用时执行提供的函数 g,但仅在另一个提供的函数 f 使用相同的输入参数完成执行之后。还有一个要求,当多次调用同一个输入args的函数时,f只在第一次调用时执行一次,其他调用等待完成,然后直接执行g。
编辑: 当 运行 在不同线程上并行时,该解决方案应该有效,并且还应该有效地使用线程。例如。阻塞应该基于每个输入而不是整个函数。
我第一次尝试的功能如下:
(defn dependent-func
([f g]
(let [mem (atom {})]
(fn [& args]
(->> (get (locking mem
(swap! mem (fn [latch-map args]
(if (contains? latch-map args)
latch-map
(let [new-latch (CountDownLatch. 1)
new-latch-map (assoc latch-map args new-latch)]
(->> (Thread. #(do (apply f args)
(.countDown new-latch)))
(.start))
new-latch-map))) args)) args)
(.await))
(apply g args)))))
这似乎符合我的要求,并且等待 f 是基于每个输入的,所以我对此比较满意。最初我希望只使用交换!做内存更新但不幸的是交换!明确指出交换中的功能!可以多次调用(我在测试中看到过)。因此,我最终不得不在更新时锁定内存,这真的很难看。
我确信一定有比我更好地利用 Closure 的并发机制的更简洁的方法来执行此操作,但到目前为止我一直无法找到它。
如有任何建议,我们将不胜感激。
谢谢,
马特
Clojure 的 future
、promise
和 deliver
组合非常适合启动一个进程并让多个线程等待它完成。
Future 用于在后台启动一个线程(它可以做更多,虽然在这个例子中我不需要它)
Promise 用于立即 return 一个对象,一旦准备就绪,该对象将包含答案。
Deliver 用于在准备好后提供承诺的答案。
我还将等待部分拆分到它自己的函数中,以使代码更易于理解,这样我就可以使用内置的记忆功能:
这个问题很好地说明了何时使用 promise 和 deliver 而不是简单的 future。
因为我们要在 运行 函数两次不安全的地方使用 memoize,
我们需要注意这两个调用不会在 完全 相同的位置输入 memoize
时间。所以我们只会在进入 memoize 时锁定,而不是持续时间
记忆功能。
hello.core> (def lock [])
#'hello.core/lock
此函数将始终 return 每次调用 f 时相同的未来对象
使用一组给定的参数,除了我们需要通过包装它来使 memoize 安全
在执行锁定的函数中(您也可以为此使用代理)
hello.core> (def wait-for-function-helper
(memoize (fn [f args]
(let [answer (promise)]
(println "waiting for function " f " with args" args)
(future (deliver answer (apply f args)))
answer))))
#'hello.core/wait-for-function-helper
hello.core> (defn wait-for-function [& args]
(locking lock
(apply wait-for-function-helper args)))
#'hello.core/wait-for-function
现在我们编写使用安全记忆的实际 dependent-func 函数,
未来生产,等待功能功能。
hello.core> (defn dependent-func [f g & args]
@(wait-for-function f args)
(apply g args))
#'hello.core/dependent-func
并定义一个慢操作以查看它的实际效果:
hello.core> (defn slow-f-1 [x]
(println "starting slow-f-1")
(Thread/sleep 10000)
(println "finishing slow-f-1")
(dec x))
#'hello.core/slow-f-1
为了测试它,我们想在 完全 同时启动两个相同的功能。
hello.core> (do (future
(println "first" (dependent-func slow-f-1 inc 4)))
(future
(println "second" (dependent-func slow-f-1 inc 4))))
waiting for function
#object[clojure.core$future_call$reify__6736 0x40534083 {:status :pending, :val nil}] with args (4)
#object[hello.core$slow_f_1 0x4f9b3396 hello.core$slow_f_1@4f9b3396]
starting slow-f-1
finishing slow-f-1
second
first
5
5
如果我们再次调用它,我们只会看到 slow-f-1 运行 一次:
hello.core> (do (future
(println "first" (dependent-func slow-f-1 inc 4)))
(future
(println "second" (dependent-func slow-f-1 inc 4))))
#object[clojure.core$future_call$reify__6736 0x3935ea29 {:status :pending, :val nil}]
first 5
second 5
类似这样的答案对您的问题来说要简单得多:
(defn waiter
[f g & args]
(let [f-result (f args)
g-result (g args) ]
(println (format "waiter: f-result=%d g-result=%d" f-result g-result))))
(defn my-f
[args]
(let [result (apply + args)]
(println "my-f running:" result)
result))
; change your orig prob a bit, and define/use my-f-memo instead of the original my-f
(def my-f-memo (memoize my-f))
(defn my-g
[args]
(let [result (apply * args)]
(println "my-g running:" result)
result))
(waiter my-f-memo my-g 2 3 4)
(waiter my-f-memo my-g 2 3 4)
> lein run
my-f running: 9
my-g running: 24
waiter: f-result=9 g-result=24
my-g running: 24
waiter: f-result=9 g-result=24
main - enter
如果稍微更改一下问题陈述并传入第一个函数的记忆版本 f
,解决方案就容易多了。
只需以 (let [...]...) 形式按顺序调用函数,即可在执行第二个函数之前强制完成第一个函数。
此外,您可以强制 waiter
函数为您执行 f
的记忆,但是手动模拟 memoize
已经完成的工作会多一些工作。
更新:原来的问题并没有明确暗示它需要在并发环境中工作。如果多个线程 是 一个问题,只需将 waiter 的定义更改为:
(defn waiter
[f g & args]
(let [f-result (locking f (f args))
g-result (g args) ]
(println (format "waiter: f-result=%d g-result=%d" f-result g-result))))
启动线程到 运行 f
没有什么意义,如果您接下来要做的是等待该线程完成。您也可以只在当前线程上 运行 f
。在那种情况下,您的问题可以很好地分解为两个子问题:
- 如何记忆对
f
的调用而不像标准记忆那样冒并发执行的风险。
- 返回使用该记忆函数的 lambda,然后调用
g
。
让我们以相反的顺序解决这些问题,首先假设 (my-memoize f)
按您的需要工作,然后再编写它:
(defn dependent-func [f g]
(let [f' (my-memoize f)]
(fn [& args]
(apply f' args)
(apply g args))))
有一个称职的 memoize 很简单,对吧?现在,要实现 memoize,您可以做一些事情。您可以像以前一样使用锁定,我认为这是非常合理的,因为您明确希望阻止并发执行;一旦你放弃线程启动业务,它也很容易:
(defn my-memoize [f]
(let [memo (atom {})]
(fn [& args]
(locking memo
(if (contains? @memo args)
(get @memo args)
(get (swap! memo assoc args (apply f args))))))))
或者您可以自己重新发明锁定,方法是在原子中存储延迟,然后让每个调用取消引用它:
(defn my-memoize [f]
(let [memo (atom {})]
(fn [& args]
(-> memo
(swap! update-in [args]
(fn [v]
(or v (delay (apply f args)))))
(get args)
(deref)))))
它是可读的并且"clever",因为它在一个swap!
中完成了所有事情,当我第一次弄明白这一点时我感到很得意,但后来我意识到这只是劫持Delay.deref()
中的互斥锁来完成锁定,所以老实说我认为你还不如使用 locking
来明确有一个锁。
我需要一个函数,当使用特定输入参数调用时执行提供的函数 g,但仅在另一个提供的函数 f 使用相同的输入参数完成执行之后。还有一个要求,当多次调用同一个输入args的函数时,f只在第一次调用时执行一次,其他调用等待完成,然后直接执行g。
编辑: 当 运行 在不同线程上并行时,该解决方案应该有效,并且还应该有效地使用线程。例如。阻塞应该基于每个输入而不是整个函数。
我第一次尝试的功能如下:
(defn dependent-func
([f g]
(let [mem (atom {})]
(fn [& args]
(->> (get (locking mem
(swap! mem (fn [latch-map args]
(if (contains? latch-map args)
latch-map
(let [new-latch (CountDownLatch. 1)
new-latch-map (assoc latch-map args new-latch)]
(->> (Thread. #(do (apply f args)
(.countDown new-latch)))
(.start))
new-latch-map))) args)) args)
(.await))
(apply g args)))))
这似乎符合我的要求,并且等待 f 是基于每个输入的,所以我对此比较满意。最初我希望只使用交换!做内存更新但不幸的是交换!明确指出交换中的功能!可以多次调用(我在测试中看到过)。因此,我最终不得不在更新时锁定内存,这真的很难看。
我确信一定有比我更好地利用 Closure 的并发机制的更简洁的方法来执行此操作,但到目前为止我一直无法找到它。
如有任何建议,我们将不胜感激。
谢谢,
马特
Clojure 的 future
、promise
和 deliver
组合非常适合启动一个进程并让多个线程等待它完成。
Future 用于在后台启动一个线程(它可以做更多,虽然在这个例子中我不需要它)
Promise 用于立即 return 一个对象,一旦准备就绪,该对象将包含答案。
Deliver 用于在准备好后提供承诺的答案。
我还将等待部分拆分到它自己的函数中,以使代码更易于理解,这样我就可以使用内置的记忆功能:
这个问题很好地说明了何时使用 promise 和 deliver 而不是简单的 future。
因为我们要在 运行 函数两次不安全的地方使用 memoize, 我们需要注意这两个调用不会在 完全 相同的位置输入 memoize 时间。所以我们只会在进入 memoize 时锁定,而不是持续时间 记忆功能。
hello.core> (def lock [])
#'hello.core/lock
此函数将始终 return 每次调用 f 时相同的未来对象 使用一组给定的参数,除了我们需要通过包装它来使 memoize 安全 在执行锁定的函数中(您也可以为此使用代理)
hello.core> (def wait-for-function-helper
(memoize (fn [f args]
(let [answer (promise)]
(println "waiting for function " f " with args" args)
(future (deliver answer (apply f args)))
answer))))
#'hello.core/wait-for-function-helper
hello.core> (defn wait-for-function [& args]
(locking lock
(apply wait-for-function-helper args)))
#'hello.core/wait-for-function
现在我们编写使用安全记忆的实际 dependent-func 函数, 未来生产,等待功能功能。
hello.core> (defn dependent-func [f g & args]
@(wait-for-function f args)
(apply g args))
#'hello.core/dependent-func
并定义一个慢操作以查看它的实际效果:
hello.core> (defn slow-f-1 [x]
(println "starting slow-f-1")
(Thread/sleep 10000)
(println "finishing slow-f-1")
(dec x))
#'hello.core/slow-f-1
为了测试它,我们想在 完全 同时启动两个相同的功能。
hello.core> (do (future
(println "first" (dependent-func slow-f-1 inc 4)))
(future
(println "second" (dependent-func slow-f-1 inc 4))))
waiting for function
#object[clojure.core$future_call$reify__6736 0x40534083 {:status :pending, :val nil}] with args (4)
#object[hello.core$slow_f_1 0x4f9b3396 hello.core$slow_f_1@4f9b3396]
starting slow-f-1
finishing slow-f-1
second
first
5
5
如果我们再次调用它,我们只会看到 slow-f-1 运行 一次:
hello.core> (do (future
(println "first" (dependent-func slow-f-1 inc 4)))
(future
(println "second" (dependent-func slow-f-1 inc 4))))
#object[clojure.core$future_call$reify__6736 0x3935ea29 {:status :pending, :val nil}]
first 5
second 5
类似这样的答案对您的问题来说要简单得多:
(defn waiter
[f g & args]
(let [f-result (f args)
g-result (g args) ]
(println (format "waiter: f-result=%d g-result=%d" f-result g-result))))
(defn my-f
[args]
(let [result (apply + args)]
(println "my-f running:" result)
result))
; change your orig prob a bit, and define/use my-f-memo instead of the original my-f
(def my-f-memo (memoize my-f))
(defn my-g
[args]
(let [result (apply * args)]
(println "my-g running:" result)
result))
(waiter my-f-memo my-g 2 3 4)
(waiter my-f-memo my-g 2 3 4)
> lein run
my-f running: 9
my-g running: 24
waiter: f-result=9 g-result=24
my-g running: 24
waiter: f-result=9 g-result=24
main - enter
如果稍微更改一下问题陈述并传入第一个函数的记忆版本 f
,解决方案就容易多了。
只需以 (let [...]...) 形式按顺序调用函数,即可在执行第二个函数之前强制完成第一个函数。
此外,您可以强制 waiter
函数为您执行 f
的记忆,但是手动模拟 memoize
已经完成的工作会多一些工作。
更新:原来的问题并没有明确暗示它需要在并发环境中工作。如果多个线程 是 一个问题,只需将 waiter 的定义更改为:
(defn waiter
[f g & args]
(let [f-result (locking f (f args))
g-result (g args) ]
(println (format "waiter: f-result=%d g-result=%d" f-result g-result))))
启动线程到 运行 f
没有什么意义,如果您接下来要做的是等待该线程完成。您也可以只在当前线程上 运行 f
。在那种情况下,您的问题可以很好地分解为两个子问题:
- 如何记忆对
f
的调用而不像标准记忆那样冒并发执行的风险。 - 返回使用该记忆函数的 lambda,然后调用
g
。
让我们以相反的顺序解决这些问题,首先假设 (my-memoize f)
按您的需要工作,然后再编写它:
(defn dependent-func [f g]
(let [f' (my-memoize f)]
(fn [& args]
(apply f' args)
(apply g args))))
有一个称职的 memoize 很简单,对吧?现在,要实现 memoize,您可以做一些事情。您可以像以前一样使用锁定,我认为这是非常合理的,因为您明确希望阻止并发执行;一旦你放弃线程启动业务,它也很容易:
(defn my-memoize [f]
(let [memo (atom {})]
(fn [& args]
(locking memo
(if (contains? @memo args)
(get @memo args)
(get (swap! memo assoc args (apply f args))))))))
或者您可以自己重新发明锁定,方法是在原子中存储延迟,然后让每个调用取消引用它:
(defn my-memoize [f]
(let [memo (atom {})]
(fn [& args]
(-> memo
(swap! update-in [args]
(fn [v]
(or v (delay (apply f args)))))
(get args)
(deref)))))
它是可读的并且"clever",因为它在一个swap!
中完成了所有事情,当我第一次弄明白这一点时我感到很得意,但后来我意识到这只是劫持Delay.deref()
中的互斥锁来完成锁定,所以老实说我认为你还不如使用 locking
来明确有一个锁。