为什么 pmap|reducers/map 没有使用所有 cpu 核心?
Why is pmap|reducers/map not using all cpu cores?
我正在尝试解析一个包含一百万行的文件,每一行都是一个 json 字符串,其中包含关于一本书的一些信息(作者、内容等)。我正在使用 iota to load the file, as my program throws an OutOfMemoryError
if I try to use slurp
. I'm also using cheshire 来解析字符串。该程序只是加载一个文件并计算所有书籍中的所有单词。
我的第一次尝试包括 pmap
来完成繁重的工作,我认为这基本上会利用我所有的 cpu 核心。
(ns multicore-parsing.core
(:require [cheshire.core :as json]
[iota :as io]
[clojure.string :as string]
[clojure.core.reducers :as r]))
(defn words-pmap
[filename]
(letfn [(parse-with-keywords [str]
(json/parse-string str true))
(words [book]
(string/split (:contents book) #"\s+"))]
(->>
(io/vec filename)
(pmap parse-with-keywords)
(pmap words)
(r/reduce #(apply conj %1 %2) #{})
(count))))
虽然它似乎确实使用了所有内核,但每个内核很少使用超过其容量的 50%,我猜这与 pmap 的批量大小有关,所以我偶然发现了 relatively old question一些评论参考了 clojure.core.reducers
库。
我决定使用 reducers/map
:
重写函数
(defn words-reducers
[filename]
(letfn [(parse-with-keywords [str]
(json/parse-string str true))
(words [book]
(string/split (:contents book) #"\s+"))]
(->>
(io/vec filename)
(r/map parse-with-keywords)
(r/map words)
(r/reduce #(apply conj %1 %2) #{})
(count))))
但是cpu使用情况更糟,与之前的实现相比,它甚至需要更长的时间才能完成:
multicore-parsing.core=> (time (words-pmap "./dummy_data.txt"))
"Elapsed time: 20899.088919 msecs"
546
multicore-parsing.core=> (time (words-reducers "./dummy_data.txt"))
"Elapsed time: 28790.976455 msecs"
546
我做错了什么? mmap loading + reducers 是解析大文件的正确方法吗?
编辑:this 是我正在使用的文件。
EDIT2:这是 iota/seq
而不是 iota/vec
的时间安排:
multicore-parsing.core=> (time (words-reducers "./dummy_data.txt"))
"Elapsed time: 160981.224565 msecs"
546
multicore-parsing.core=> (time (words-pmap "./dummy_data.txt"))
"Elapsed time: 160296.482722 msecs"
546
我不认为 reducer 会是适合您的解决方案,因为它们根本不能很好地处理惰性序列(reducer 会通过惰性序列给出正确的结果,但不会很好地平行化)。
你可能想看看这个 sample code from the book Seven Concurrency Models in Seven Weeks(免责声明:我是作者)它解决了一个类似的问题(计算每个词在维基百科上出现的次数)。
给定维基百科页面列表,此函数按顺序计算单词数(get-words
returns 页面中的单词序列):
(defn count-words-sequential [pages]
(frequencies (mapcat get-words pages)))
这是一个使用 pmap
的并行版本,它 运行 更快,但只快了大约 1.5 倍:
(defn count-words-parallel [pages]
(reduce (partial merge-with +)
(pmap #(frequencies (get-words %)) pages)))
它只快 1.5 倍左右的原因是因为 reduce
成为瓶颈——它为每个页面调用一次 (partial merge-with +)
。合并 100 个页面的批次可将 4 核机器上的性能提高到 3.2 倍左右:
(defn count-words [pages]
(reduce (partial merge-with +)
(pmap count-words-sequential (partition-all 100 pages))))
我正在尝试解析一个包含一百万行的文件,每一行都是一个 json 字符串,其中包含关于一本书的一些信息(作者、内容等)。我正在使用 iota to load the file, as my program throws an OutOfMemoryError
if I try to use slurp
. I'm also using cheshire 来解析字符串。该程序只是加载一个文件并计算所有书籍中的所有单词。
我的第一次尝试包括 pmap
来完成繁重的工作,我认为这基本上会利用我所有的 cpu 核心。
(ns multicore-parsing.core
(:require [cheshire.core :as json]
[iota :as io]
[clojure.string :as string]
[clojure.core.reducers :as r]))
(defn words-pmap
[filename]
(letfn [(parse-with-keywords [str]
(json/parse-string str true))
(words [book]
(string/split (:contents book) #"\s+"))]
(->>
(io/vec filename)
(pmap parse-with-keywords)
(pmap words)
(r/reduce #(apply conj %1 %2) #{})
(count))))
虽然它似乎确实使用了所有内核,但每个内核很少使用超过其容量的 50%,我猜这与 pmap 的批量大小有关,所以我偶然发现了 relatively old question一些评论参考了 clojure.core.reducers
库。
我决定使用 reducers/map
:
(defn words-reducers
[filename]
(letfn [(parse-with-keywords [str]
(json/parse-string str true))
(words [book]
(string/split (:contents book) #"\s+"))]
(->>
(io/vec filename)
(r/map parse-with-keywords)
(r/map words)
(r/reduce #(apply conj %1 %2) #{})
(count))))
但是cpu使用情况更糟,与之前的实现相比,它甚至需要更长的时间才能完成:
multicore-parsing.core=> (time (words-pmap "./dummy_data.txt"))
"Elapsed time: 20899.088919 msecs"
546
multicore-parsing.core=> (time (words-reducers "./dummy_data.txt"))
"Elapsed time: 28790.976455 msecs"
546
我做错了什么? mmap loading + reducers 是解析大文件的正确方法吗?
编辑:this 是我正在使用的文件。
EDIT2:这是 iota/seq
而不是 iota/vec
的时间安排:
multicore-parsing.core=> (time (words-reducers "./dummy_data.txt"))
"Elapsed time: 160981.224565 msecs"
546
multicore-parsing.core=> (time (words-pmap "./dummy_data.txt"))
"Elapsed time: 160296.482722 msecs"
546
我不认为 reducer 会是适合您的解决方案,因为它们根本不能很好地处理惰性序列(reducer 会通过惰性序列给出正确的结果,但不会很好地平行化)。
你可能想看看这个 sample code from the book Seven Concurrency Models in Seven Weeks(免责声明:我是作者)它解决了一个类似的问题(计算每个词在维基百科上出现的次数)。
给定维基百科页面列表,此函数按顺序计算单词数(get-words
returns 页面中的单词序列):
(defn count-words-sequential [pages]
(frequencies (mapcat get-words pages)))
这是一个使用 pmap
的并行版本,它 运行 更快,但只快了大约 1.5 倍:
(defn count-words-parallel [pages]
(reduce (partial merge-with +)
(pmap #(frequencies (get-words %)) pages)))
它只快 1.5 倍左右的原因是因为 reduce
成为瓶颈——它为每个页面调用一次 (partial merge-with +)
。合并 100 个页面的批次可将 4 核机器上的性能提高到 3.2 倍左右:
(defn count-words [pages]
(reduce (partial merge-with +)
(pmap count-words-sequential (partition-all 100 pages))))