clojure pmap - 为什么我不使用所有内核?

clojure pmap - why aren't i using all the cores?

我正在尝试使用 clojure pantomime 库从大量 tif 文档(以及其他文档)中 extract/ocr 文本。

我的计划是使用 pmap 对一系列输入数据(来自 postgres 数据库)应用映射,然后使用 tika/tesseract OCR 输出更新同一个 postgres 数据库。这一直工作正常,但是我在 htop 中注意到许多内核有时处于空闲状态。

有没有什么办法可以解决这个问题,我可以采取什么步骤来确定为什么这可能会阻塞某个地方?所有处理都发生在单个 tif 文件上,每个线程完全互斥。

附加信息:

  1. 一些 tika/tesseract 进程需要 3 秒,其他进程最多需要 90 秒。一般来说,tika 受 CPU 约束。根据 htop,我有足够的可用内存。
  2. postgres 在会话管理中没有锁定问题,所以我认为这不会阻碍我。
  3. 也许 future 的某个地方正在等待 deref?怎么知道在哪里?

感谢任何提示,谢谢。下面添加了代码。

(defn parse-a-path [{:keys [row_id, file_path]}]
      (try
        (let [
              start        (System/currentTimeMillis)
              mime_type    (pm/mime-type-of file_path)
              file_content (-> file_path (extract/parse) :text)
              language     (pl/detect-language file_content)
              ]
          {:mime_type   mime_type
          :file_content file_content
          :language     language
          :row_id       row_id
          :parse_time_in_seconds   (float (/ ( - (System/currentTimeMillis) start) 100))
          :record_status "doc parsed"})))


(defn fetch-all-batch []
      (t/info (str "Fetching lazy seq. all rows for batch.") )
      (jdbc/query (db-connection)
                  ["select
                   row_id,
                   file_path ,
                   file_extension
                   from the_table" ]))


(defn update-a-row [{:keys [row_id, file_path, file_extension] :as all-keys}]
      (let [parse-out (parse-a-path all-keys )]
        (try
          (doall
            (jdbc/execute!
              (db-connection)
              ["update the_table
               set
               record_last_updated        = current_timestamp ,
               file_content          = ?                 ,
               mime_type             = ?                 ,
               language              = ?                 ,
               parse_time_in_seconds = ?                 ,
               record_status         = ?
               where row_id = ? "
               (:file_content          parse-out) ,
               (:mime_type             parse-out) ,
               (:language              parse-out) ,
               (:parse_time_in_seconds parse-out) ,
               (:record_status         parse-out) ,
               row_id ])
            (t/debug (str "updated row_id " (:row_id parse-out) " (" file_extension ") "
                          " in " (:parse_time_in_seconds parse-out) " seconds." )))
          (catch  Exception _ ))))

(dorun
  (pmap
    #(try
       (update-a-row %)
       (catch Exception e (t/error (.getNextException e)))
       )
    fetch-all-batch )
  )

前段时间我遇到过类似的问题。我猜你和我的假设是一样的:

  • pmap 并行调用 f。但这并不意味着工作是平等分担的。正如您所说,有些需要 3 秒,而另一些需要 90 秒。在 3 秒内完成的线程不会要求其他线程分担一些剩下的工作。所以完成的线程只是空闲等待,直到最后一个线程完成。

  • 您没有准确描述您的数据,但我假设您正在使用某种惰性序列,这不利于并行处理。如果您的进程是 CPU 有界的,并且您可以将整个输入保存在内存中,那么更喜欢使用 clojure.core.reducers('map'、'filter',特别是 'fold')惰性映射、过滤器等的使用。

在我的例子中,这些技巧将处理时间从 34 秒减少到仅仅 8 秒。希望对你有帮助

pmap 在(+ 2 个核心)的批次上并行运行 map 函数,但保留顺序。这意味着如果您有 8 个核心,将处理一批 10 个项目,但只有在所有 10 个都完成后才会开始新的批次。

您可以创建自己的代码,使用 futuredelayderef 的组合,这将是很好的学术练习。之后,您可以扔掉代码并开始使用 claypoole 库,它有一组抽象涵盖了 future.

的大部分用途

对于这种特定情况,使用他们的无序 pmappfor 实现(upmapupfor),它们做的事情完全相同 pmap有但没有订购;一批中的任何一件物品完成后,新物品就会被取走。

在 IO 是主要瓶颈的情况下,或者工作项之间的处理时间可能有很大差异的情况下,并行化 mapfor 操作是最好的方法。

当然,您应该注意不要依赖 return 值的任何排序。

  (require '[com.climate.claypoole :as cp])

  (cp/upmap (cp/ncpus)
    #(try
       (update-a-row %)
       (catch Exception e (t/error (.getNextException e)))
       )
    fetch-all-batch )