Clojure - 滑动 Window 最小记录时间
Clojure - Sliding Window Minimum in Log Time
给定向量大小 n 和 window 大小 k,我如何才能在 n log k 时间内有效计算滑动 window 最小值?即,对于向量 [1 4 3 2 5 4 2] 和 window 大小 2,输出将是 [1 3 2 2 4 2].
显然我可以使用分区和映射来完成,但那是 n * k 时间。
我想我需要跟踪排序地图中的最小值,并在地图超出 window 时更新地图。但是,虽然我可以在日志时间中获得排序映射的最小值,但是通过映射查找任何过期的索引并不是日志时间。
谢谢。
您可以使用基于 Clojure's priority map 数据结构的优先级队列来解决此问题。我们用它们在向量中的位置索引 window 中的值。
- 其第一个条目的值是 window 最小值。
- 我们通过 key/vector-position 添加新条目并删除最旧的条目。
一个可能的实现是
(use [clojure.data.priority-map :only [priority-map]])
(defn windowed-min [k coll]
(let [numbered (map-indexed list coll)
[head tail] (split-at k numbered)
init-win (into (priority-map) head)
win-seq (reductions
(fn [w [i n]]
(-> w (dissoc (- i k)) (assoc i n)))
init-win
tail)]
(map (comp val first) win-seq)))
例如,
(windowed-min 2 [1 4 3 2 5 4 2])
=> (1 3 2 2 4 2)
解决方案是延迟开发的,因此可以应用于无穷无尽的序列。
初始化后,即 O(k),函数计算 O(log k) 序列中的每个元素时间,如前所述 here.
我的解决方案使用两个辅助地图来实现快速性能。我将键映射到它们的值,并将值存储到它们在排序映射中的出现。在 window 的每一次移动中,我都会更新地图,并获得排序地图的最小值,所有这些都在对数时间内完成。
缺点是代码更丑陋,不懒惰,也不惯用。好处是它比优先级映射解决方案高出约 2 倍。不过,我认为很多原因都可以归咎于上述解决方案的懒惰。
(defn- init-aux-maps [w v]
(let [sv (subvec v 0 w)
km (->> sv (map-indexed vector) (into (sorted-map)))
vm (->> sv frequencies (into (sorted-map)))]
[km vm]))
(defn- update-aux-maps [[km vm] j x]
(let [[ai av] (first km)
km (-> km (dissoc ai) (assoc j x))
vm (if (= (vm av) 1) (dissoc vm av) (update vm av dec))
vm (if (nil? (get vm x)) (assoc vm x 1) (update vm x inc))]
[km vm]))
(defn- get-minimum [[_ vm]] (ffirst vm))
(defn sliding-minimum [w v]
(loop [i 0, j w, am (init-aux-maps w v), acc []]
(let [acc (conj acc (get-minimum am))]
(if (< j (count v))
(recur (inc i) (inc j) (update-aux-maps am j (v j)) acc)
acc))))
您可以在线性时间内求解 --O(n),而不是如 1 所述的 O(n*log k))。http://articles.leetcode.com/sliding-window-maximum/ (easily change from find max to find min) and 2. https://people.cs.uct.ac.za/~ksmith/articles/sliding_window_minimum.html
这些方法需要一个双端队列来管理以前的值,这在大多数队列操作(即 push/pop/peek 等)中使用 O(1) 时间,而不是在使用优先级队列时使用 O(log K)(即优先地图)。我使用了来自 https://github.com/pjstadig/deque-clojure
的双端队列
上面第一个参考中实现代码的主要代码(对于最小值而不是最大值):
(defn windowed-min-queue [w a]
(let [
deque-init (fn deque-init [] (reduce (fn [dq i]
(dq-push-back i (prune-back a i dq)))
empty-deque (range w)))
process-min (fn process-min [dq] (reductions (fn [q i]
(->> q
(prune-back a i)
(prune-front i w)
(dq-push-back i)))
dq (range w (count a))))
init (deque-init)
result (process-min init)] ;(process-min init)]
(map #(nth a (dq-front %)) result)))
将此方法的速度与我们现有的使用优先级映射的其他解决方案进行比较(注意:我喜欢其他解决方案,因为它更简单)。
; Test using Random arrays of data
(def N 1000000)
(def a (into [] (take N (repeatedly #(rand-int 50)))))
(def b (into [] (take N (repeatedly #(rand-int 50)))))
(def w 1024)
; Solution based upon Priority Map (see other solution which is also great since its simpler)
(time (doall (windowed-min-queue w a)))
;=> "Elapsed time: 1820.526521 msecs"
; Solution based upon double-ended queue
(time (doall (windowed-min w b)))
;=> "Elapsed time: 8290.671121 msecs"
速度快了 4 倍多,考虑到 PriorityMap 是用 Java 编写的,而双端队列代码是纯 Clojure(参见 https://github.com/pjstadig/deque-clojure)
包括另一个wrappers/utilities在双端队列上使用,供参考
(defn dq-push-front [e dq]
(conj dq e))
(defn dq-push-back [e dq]
(proto/inject dq e))
(defn dq-front [dq]
(first dq))
(defn dq-pop-front [dq]
(pop dq))
(defn dq-pop-back [dq]
(proto/eject dq))
(defn deque-empty? [dq]
(identical? empty-deque dq))
(defn dq-back [dq]
(proto/last dq))
(defn dq-front [dq]
(first dq))
(defn prune-back [a i dq]
(cond
(deque-empty? dq) dq
(< (nth a i) (nth a (dq-back dq))) (recur a i (dq-pop-back dq))
:else dq))
(defn prune-front [i w dq]
(cond
(deque-empty? dq) dq
(<= (dq-front dq) (- i w)) (recur i w (dq-pop-front dq))
:else dq))
给定向量大小 n 和 window 大小 k,我如何才能在 n log k 时间内有效计算滑动 window 最小值?即,对于向量 [1 4 3 2 5 4 2] 和 window 大小 2,输出将是 [1 3 2 2 4 2].
显然我可以使用分区和映射来完成,但那是 n * k 时间。
我想我需要跟踪排序地图中的最小值,并在地图超出 window 时更新地图。但是,虽然我可以在日志时间中获得排序映射的最小值,但是通过映射查找任何过期的索引并不是日志时间。
谢谢。
您可以使用基于 Clojure's priority map 数据结构的优先级队列来解决此问题。我们用它们在向量中的位置索引 window 中的值。
- 其第一个条目的值是 window 最小值。
- 我们通过 key/vector-position 添加新条目并删除最旧的条目。
一个可能的实现是
(use [clojure.data.priority-map :only [priority-map]])
(defn windowed-min [k coll]
(let [numbered (map-indexed list coll)
[head tail] (split-at k numbered)
init-win (into (priority-map) head)
win-seq (reductions
(fn [w [i n]]
(-> w (dissoc (- i k)) (assoc i n)))
init-win
tail)]
(map (comp val first) win-seq)))
例如,
(windowed-min 2 [1 4 3 2 5 4 2])
=> (1 3 2 2 4 2)
解决方案是延迟开发的,因此可以应用于无穷无尽的序列。
初始化后,即 O(k),函数计算 O(log k) 序列中的每个元素时间,如前所述 here.
我的解决方案使用两个辅助地图来实现快速性能。我将键映射到它们的值,并将值存储到它们在排序映射中的出现。在 window 的每一次移动中,我都会更新地图,并获得排序地图的最小值,所有这些都在对数时间内完成。
缺点是代码更丑陋,不懒惰,也不惯用。好处是它比优先级映射解决方案高出约 2 倍。不过,我认为很多原因都可以归咎于上述解决方案的懒惰。
(defn- init-aux-maps [w v]
(let [sv (subvec v 0 w)
km (->> sv (map-indexed vector) (into (sorted-map)))
vm (->> sv frequencies (into (sorted-map)))]
[km vm]))
(defn- update-aux-maps [[km vm] j x]
(let [[ai av] (first km)
km (-> km (dissoc ai) (assoc j x))
vm (if (= (vm av) 1) (dissoc vm av) (update vm av dec))
vm (if (nil? (get vm x)) (assoc vm x 1) (update vm x inc))]
[km vm]))
(defn- get-minimum [[_ vm]] (ffirst vm))
(defn sliding-minimum [w v]
(loop [i 0, j w, am (init-aux-maps w v), acc []]
(let [acc (conj acc (get-minimum am))]
(if (< j (count v))
(recur (inc i) (inc j) (update-aux-maps am j (v j)) acc)
acc))))
您可以在线性时间内求解 --O(n),而不是如 1 所述的 O(n*log k))。http://articles.leetcode.com/sliding-window-maximum/ (easily change from find max to find min) and 2. https://people.cs.uct.ac.za/~ksmith/articles/sliding_window_minimum.html
这些方法需要一个双端队列来管理以前的值,这在大多数队列操作(即 push/pop/peek 等)中使用 O(1) 时间,而不是在使用优先级队列时使用 O(log K)(即优先地图)。我使用了来自 https://github.com/pjstadig/deque-clojure
的双端队列上面第一个参考中实现代码的主要代码(对于最小值而不是最大值):
(defn windowed-min-queue [w a]
(let [
deque-init (fn deque-init [] (reduce (fn [dq i]
(dq-push-back i (prune-back a i dq)))
empty-deque (range w)))
process-min (fn process-min [dq] (reductions (fn [q i]
(->> q
(prune-back a i)
(prune-front i w)
(dq-push-back i)))
dq (range w (count a))))
init (deque-init)
result (process-min init)] ;(process-min init)]
(map #(nth a (dq-front %)) result)))
将此方法的速度与我们现有的使用优先级映射的其他解决方案进行比较(注意:我喜欢其他解决方案,因为它更简单)。
; Test using Random arrays of data
(def N 1000000)
(def a (into [] (take N (repeatedly #(rand-int 50)))))
(def b (into [] (take N (repeatedly #(rand-int 50)))))
(def w 1024)
; Solution based upon Priority Map (see other solution which is also great since its simpler)
(time (doall (windowed-min-queue w a)))
;=> "Elapsed time: 1820.526521 msecs"
; Solution based upon double-ended queue
(time (doall (windowed-min w b)))
;=> "Elapsed time: 8290.671121 msecs"
速度快了 4 倍多,考虑到 PriorityMap 是用 Java 编写的,而双端队列代码是纯 Clojure(参见 https://github.com/pjstadig/deque-clojure)
包括另一个wrappers/utilities在双端队列上使用,供参考
(defn dq-push-front [e dq]
(conj dq e))
(defn dq-push-back [e dq]
(proto/inject dq e))
(defn dq-front [dq]
(first dq))
(defn dq-pop-front [dq]
(pop dq))
(defn dq-pop-back [dq]
(proto/eject dq))
(defn deque-empty? [dq]
(identical? empty-deque dq))
(defn dq-back [dq]
(proto/last dq))
(defn dq-front [dq]
(first dq))
(defn prune-back [a i dq]
(cond
(deque-empty? dq) dq
(< (nth a i) (nth a (dq-back dq))) (recur a i (dq-pop-back dq))
:else dq))
(defn prune-front [i w dq]
(cond
(deque-empty? dq) dq
(<= (dq-front dq) (- i w)) (recur i w (dq-pop-front dq))
:else dq))