在 Clojure 中,我怎样才能使用转换器实现“频率”的高性能版本?

In Clojure, how can I do a performant version of `frequencies` with transducers?

(问题来源:Fernando Abrao。)

我听说 Clojure 中转换器的性能优势,但我不确定如何使用它们。

假设我有一个 qos/device-qos-range 函数,该函数 returns 映射序列,其中一些包含小数 :samplevalue,如下所示:

[
  { :samplevalue 1.3, ... },
  { :othervalue -27.7, ... },
  { :samplevalue 7.5, ... },
  { :samplevalue 1.9, ... },
]

我想看看有多少 :samplevalue 落入每个整数 bin,如下所示:

(frequencies
  (reduce #(if (not (nil? (:samplevalue %2)))
             (conj %1 (.intValue (:samplevalue %2))))
          []
          (qos/device-qos-range origem device qos alvo inicio fim)))

;; => {1 2, 7 1}

如何将其转换为带有消除中间数据结构(例如 reduce 返回的数据结构)的转换器的快速版本?可以利用多核进行并行处理的代码的加分项。

(答案来源:Renzo Borgatti(@reborg)。)

首先,让我们设置一些示例数据,稍后我们将使用这些数据进行性能测试。该向量包含 500k 个具有相同键的映射。值有 1/5 的时间重叠。

(def data 
 (mapv hash-map 
       (repeat :samplevalue) 
       (concat (range 1e5)
               (range 1e5)
               (range 1e5)
               (range 1e5)
               (range 1e5))))

现在让我们用转换器进行转换。请注意,此解决方案是 而非 并行的。我将您的 .intValue 缩短为 int,其作用相同。此外,从每个映射中有条件地获取 :samplevalue 可以缩短为 (keep :samplevalue sequence),相当于 (remove nil? (map :samplevalue sequence))。我们将使用 Criterium 进行基准测试。

(require '[criterium.core :refer [quick-bench]])
(quick-bench
  (transduce
    (comp
      (keep :samplevalue)
      (map int))
    (completing #(assoc! %1 %2 (inc (get %1 %2 0))) persistent!)
    (transient {})
    data))
;; My execution time mean: 405 ms

请注意,这次我们没有将 frequencies 作为外部步骤调用。相反,我们将其融入了操作中。就像 frequencies 所做的一样,我们在瞬态哈希图上完成了操作以获得额外的性能。我们通过使用瞬态哈希图作为种子和 completing 通过调用 persistent! 最终值来做到这一点。

我们可以将其平行化。为了获得最佳性能,我们使用可变的 Java ConcurrentHashMap 而不是不可变的 Clojure 数据结构。

(require '[clojure.core.reducers :as r])
(import '[java.util HashMap Collections Map]
        'java.util.concurrent.atomic.AtomicInteger
        'java.util.concurrent.ConcurrentHashMap)

(quick-bench
  (let [concurrency-level (.availableProcessors (Runtime/getRuntime))
        m (ConcurrentHashMap. (quot (count data) 2) 0.75 concurrency-level)
        combinef (fn ([] m) ([_ _]))  ; just return `m` from the combine step
        rf (fn [^Map m k]
             (let [^AtomicInteger v (or (.get m k) (.putIfAbsent m k (AtomicInteger. 1)))]
               (when v (.incrementAndGet v))
               m))
        reducef ((comp (keep :samplevalue) (map int)) rf)]
    (r/fold combinef reducef data)
    (into {} m)))
;; My execution time mean: 70 ms

这里我们使用 clojure.core.reducers 库中的 fold 来实现并行。请注意,在并行上下文中,使用的任何转换器都必须是无状态的。另请注意 ConcurrentHashMap 不支持使用 nil 作为键或值;幸运的是,我们不需要在这里这样做。

输出在最后被转换成一个不可变的 Clojure 哈希映射。您可以删除该步骤并仅使用 ConcurrentHashMap 实例来获得额外的加速——在我的机器上,删除 into 步骤会使整个 fold 花费大约 26 毫秒。

编辑 2017 年 11 月 20 日: 用户@clojuremostly 正确地指出该答案的早期版本在 [=30= 中调用了 quick-bench ] 初始化并发哈希映射实例的块,这意味着基准测试在其所有运行中使用相同的实例。我将对 quick-bench 的调用移到了 let 块之外。它并没有显着影响结果。