clojure pmap - 为什么我不使用所有内核?
clojure pmap - why aren't i using all the cores?
我正在尝试使用 clojure pantomime
库从大量 tif
文档(以及其他文档)中 extract/ocr 文本。
我的计划是使用 pmap 对一系列输入数据(来自 postgres 数据库)应用映射,然后使用 tika/tesseract OCR 输出更新同一个 postgres 数据库。这一直工作正常,但是我在 htop 中注意到许多内核有时处于空闲状态。
有没有什么办法可以解决这个问题,我可以采取什么步骤来确定为什么这可能会阻塞某个地方?所有处理都发生在单个 tif 文件上,每个线程完全互斥。
附加信息:
- 一些 tika/tesseract 进程需要 3 秒,其他进程最多需要 90 秒。一般来说,tika 受 CPU 约束。根据
htop
,我有足够的可用内存。
- postgres 在会话管理中没有锁定问题,所以我认为这不会阻碍我。
- 也许
future
的某个地方正在等待 deref
?怎么知道在哪里?
感谢任何提示,谢谢。下面添加了代码。
(defn parse-a-path [{:keys [row_id, file_path]}]
(try
(let [
start (System/currentTimeMillis)
mime_type (pm/mime-type-of file_path)
file_content (-> file_path (extract/parse) :text)
language (pl/detect-language file_content)
]
{:mime_type mime_type
:file_content file_content
:language language
:row_id row_id
:parse_time_in_seconds (float (/ ( - (System/currentTimeMillis) start) 100))
:record_status "doc parsed"})))
(defn fetch-all-batch []
(t/info (str "Fetching lazy seq. all rows for batch.") )
(jdbc/query (db-connection)
["select
row_id,
file_path ,
file_extension
from the_table" ]))
(defn update-a-row [{:keys [row_id, file_path, file_extension] :as all-keys}]
(let [parse-out (parse-a-path all-keys )]
(try
(doall
(jdbc/execute!
(db-connection)
["update the_table
set
record_last_updated = current_timestamp ,
file_content = ? ,
mime_type = ? ,
language = ? ,
parse_time_in_seconds = ? ,
record_status = ?
where row_id = ? "
(:file_content parse-out) ,
(:mime_type parse-out) ,
(:language parse-out) ,
(:parse_time_in_seconds parse-out) ,
(:record_status parse-out) ,
row_id ])
(t/debug (str "updated row_id " (:row_id parse-out) " (" file_extension ") "
" in " (:parse_time_in_seconds parse-out) " seconds." )))
(catch Exception _ ))))
(dorun
(pmap
#(try
(update-a-row %)
(catch Exception e (t/error (.getNextException e)))
)
fetch-all-batch )
)
前段时间我遇到过类似的问题。我猜你和我的假设是一样的:
pmap 并行调用 f。但这并不意味着工作是平等分担的。正如您所说,有些需要 3 秒,而另一些需要 90 秒。在 3 秒内完成的线程不会要求其他线程分担一些剩下的工作。所以完成的线程只是空闲等待,直到最后一个线程完成。
您没有准确描述您的数据,但我假设您正在使用某种惰性序列,这不利于并行处理。如果您的进程是 CPU 有界的,并且您可以将整个输入保存在内存中,那么更喜欢使用 clojure.core.reducers('map'、'filter',特别是 'fold')惰性映射、过滤器等的使用。
在我的例子中,这些技巧将处理时间从 34 秒减少到仅仅 8 秒。希望对你有帮助
pmap
在(+ 2 个核心)的批次上并行运行 map 函数,但保留顺序。这意味着如果您有 8 个核心,将处理一批 10 个项目,但只有在所有 10 个都完成后才会开始新的批次。
您可以创建自己的代码,使用 future
、delay
和 deref
的组合,这将是很好的学术练习。之后,您可以扔掉代码并开始使用 claypoole 库,它有一组抽象涵盖了 future
.
的大部分用途
对于这种特定情况,使用他们的无序 pmap
或 pfor
实现(upmap
和 upfor
),它们做的事情完全相同 pmap
有但没有订购;一批中的任何一件物品完成后,新物品就会被取走。
在 IO 是主要瓶颈的情况下,或者工作项之间的处理时间可能有很大差异的情况下,并行化 map
或 for
操作是最好的方法。
当然,您应该注意不要依赖 return 值的任何排序。
(require '[com.climate.claypoole :as cp])
(cp/upmap (cp/ncpus)
#(try
(update-a-row %)
(catch Exception e (t/error (.getNextException e)))
)
fetch-all-batch )
我正在尝试使用 clojure pantomime
库从大量 tif
文档(以及其他文档)中 extract/ocr 文本。
我的计划是使用 pmap 对一系列输入数据(来自 postgres 数据库)应用映射,然后使用 tika/tesseract OCR 输出更新同一个 postgres 数据库。这一直工作正常,但是我在 htop 中注意到许多内核有时处于空闲状态。
有没有什么办法可以解决这个问题,我可以采取什么步骤来确定为什么这可能会阻塞某个地方?所有处理都发生在单个 tif 文件上,每个线程完全互斥。
附加信息:
- 一些 tika/tesseract 进程需要 3 秒,其他进程最多需要 90 秒。一般来说,tika 受 CPU 约束。根据
htop
,我有足够的可用内存。 - postgres 在会话管理中没有锁定问题,所以我认为这不会阻碍我。
- 也许
future
的某个地方正在等待deref
?怎么知道在哪里?
感谢任何提示,谢谢。下面添加了代码。
(defn parse-a-path [{:keys [row_id, file_path]}]
(try
(let [
start (System/currentTimeMillis)
mime_type (pm/mime-type-of file_path)
file_content (-> file_path (extract/parse) :text)
language (pl/detect-language file_content)
]
{:mime_type mime_type
:file_content file_content
:language language
:row_id row_id
:parse_time_in_seconds (float (/ ( - (System/currentTimeMillis) start) 100))
:record_status "doc parsed"})))
(defn fetch-all-batch []
(t/info (str "Fetching lazy seq. all rows for batch.") )
(jdbc/query (db-connection)
["select
row_id,
file_path ,
file_extension
from the_table" ]))
(defn update-a-row [{:keys [row_id, file_path, file_extension] :as all-keys}]
(let [parse-out (parse-a-path all-keys )]
(try
(doall
(jdbc/execute!
(db-connection)
["update the_table
set
record_last_updated = current_timestamp ,
file_content = ? ,
mime_type = ? ,
language = ? ,
parse_time_in_seconds = ? ,
record_status = ?
where row_id = ? "
(:file_content parse-out) ,
(:mime_type parse-out) ,
(:language parse-out) ,
(:parse_time_in_seconds parse-out) ,
(:record_status parse-out) ,
row_id ])
(t/debug (str "updated row_id " (:row_id parse-out) " (" file_extension ") "
" in " (:parse_time_in_seconds parse-out) " seconds." )))
(catch Exception _ ))))
(dorun
(pmap
#(try
(update-a-row %)
(catch Exception e (t/error (.getNextException e)))
)
fetch-all-batch )
)
前段时间我遇到过类似的问题。我猜你和我的假设是一样的:
pmap 并行调用 f。但这并不意味着工作是平等分担的。正如您所说,有些需要 3 秒,而另一些需要 90 秒。在 3 秒内完成的线程不会要求其他线程分担一些剩下的工作。所以完成的线程只是空闲等待,直到最后一个线程完成。
您没有准确描述您的数据,但我假设您正在使用某种惰性序列,这不利于并行处理。如果您的进程是 CPU 有界的,并且您可以将整个输入保存在内存中,那么更喜欢使用 clojure.core.reducers('map'、'filter',特别是 'fold')惰性映射、过滤器等的使用。
在我的例子中,这些技巧将处理时间从 34 秒减少到仅仅 8 秒。希望对你有帮助
pmap
在(+ 2 个核心)的批次上并行运行 map 函数,但保留顺序。这意味着如果您有 8 个核心,将处理一批 10 个项目,但只有在所有 10 个都完成后才会开始新的批次。
您可以创建自己的代码,使用 future
、delay
和 deref
的组合,这将是很好的学术练习。之后,您可以扔掉代码并开始使用 claypoole 库,它有一组抽象涵盖了 future
.
对于这种特定情况,使用他们的无序 pmap
或 pfor
实现(upmap
和 upfor
),它们做的事情完全相同 pmap
有但没有订购;一批中的任何一件物品完成后,新物品就会被取走。
在 IO 是主要瓶颈的情况下,或者工作项之间的处理时间可能有很大差异的情况下,并行化 map
或 for
操作是最好的方法。
当然,您应该注意不要依赖 return 值的任何排序。
(require '[com.climate.claypoole :as cp])
(cp/upmap (cp/ncpus)
#(try
(update-a-row %)
(catch Exception e (t/error (.getNextException e)))
)
fetch-all-batch )