如何在 Clojure 中执行并行事务
How to execute parallel transactions in Clojure
我有一系列需要并行处理的客户。我试图为此使用 pmap
。结果非常缓慢,比顺序实施慢得多。内部函数 process-customer
有一个事务。显然,pmap 会立即启动所有事务,并且它们最终会重试以降低性能。最好的并行化方法是什么?
(defn process-customers [customers]
(doall
(pmap
(fn [sub-customers]
(doseq [customer sub-customers]
(process-customer customer)))
(partition-all 10 customers))))
编辑:
process-customer
函数涉及以下步骤。为了简洁起见,我写了这些步骤。所有步骤都在一个交易中,以确保另一个并行交易不会导致负库存等不一致。
(defn- process-customer [customer]
"Process `customer`. Consists of three steps:
1. Finding all stores in which the requested products are still available.
2. Sorting the found stores to find the cheapest (for the sum of all products).
3. Buying the products by updating the `stock`.
)
编辑 2: process-customers
的以下版本与上面的并行 process-customers
具有相同的性能。下面显然是顺序的。
(defn process-customers [customers]
"Process `customers` one by one. In this code, this happens sequentially."
(doseq [customer customers]
(process-customer customer)))
我假设您的交易在 process-customer
的整个生命周期内锁定库存。这将是缓慢的,因为所有顾客都在争相购买同一家商店。如果您可以将该过程分为两个阶段:1) 报价和 2) 仅在 (2) 上完成和应用交易,那么性能应该会好得多。或者,如果您购买 agent
编程,您将在消息级别为您自动定义事务边界。这是您可以考虑的一个示例:
(defn get-best-deal
"Returns the best deal for a given order with given stores (agent)"
[stores order]
;;
;; request for quotation from 1000 stores (in parallel)
;;
(doseq [store stores]
(send store get-quote order))
;;
;; wait for reply, up to 0.5s
;;
(apply await-for 500 stores)
;;
;; sort and find the best store
;;
(when-let [best-store (->> stores
(filter (fn [store] (get-in @store [:quotes order])))
(sort-by (fn [store] (->> (get-in @store [:quotes order])
vals
(reduce +))))
first)]
{:best-store best-store
:invoice-id (do
;; execute the order
(send best-store fulfill order)
;; wait for the transaction to complete
(await best-store)
;; get an invoice id
(get-in @best-store [:invoices order]))}))
并从 1,000 家商店中找到 100 种产品的 100 个订单(总共 289 个订单项)的最优惠价格:
(->> orders
(pmap (partial get-best-deal stores))
(filter :invoice-id)
count
time)
;; => 57
;; "Elapsed time: 312.002328 msecs"
示例业务逻辑:
(defn get-quote
"issue a quote by checking inventory"
[store {:keys [order-items] :as order}]
(if-let [quote (->> order-items
(reduce reduce-inventory
{:store store
:quote nil})
:quote)]
;; has inventory to generate a quote
(assoc-in store [:quotes order] quote)
;; no inventory
(update store :quotes dissoc order)))
(defn fulfill
"fulfill an order if previuosly quoted"
[store order]
(if-let [quote (get-in store [:quotes order])]
;; check inventory again and generate invoice
(let [[invoice inventory'] (check-inventory-and-generate-invoice store order)]
(cond-> store
invoice (->
;; register invoice
(assoc-in [:invoices order] invoice)
;; invalidate the quote
(update :quotes dissoc order)
;; update inventory
(assoc :inventory inventory'))))
;; not quoted before
store))
我有一系列需要并行处理的客户。我试图为此使用 pmap
。结果非常缓慢,比顺序实施慢得多。内部函数 process-customer
有一个事务。显然,pmap 会立即启动所有事务,并且它们最终会重试以降低性能。最好的并行化方法是什么?
(defn process-customers [customers]
(doall
(pmap
(fn [sub-customers]
(doseq [customer sub-customers]
(process-customer customer)))
(partition-all 10 customers))))
编辑:
process-customer
函数涉及以下步骤。为了简洁起见,我写了这些步骤。所有步骤都在一个交易中,以确保另一个并行交易不会导致负库存等不一致。
(defn- process-customer [customer]
"Process `customer`. Consists of three steps:
1. Finding all stores in which the requested products are still available.
2. Sorting the found stores to find the cheapest (for the sum of all products).
3. Buying the products by updating the `stock`.
)
编辑 2: process-customers
的以下版本与上面的并行 process-customers
具有相同的性能。下面显然是顺序的。
(defn process-customers [customers]
"Process `customers` one by one. In this code, this happens sequentially."
(doseq [customer customers]
(process-customer customer)))
我假设您的交易在 process-customer
的整个生命周期内锁定库存。这将是缓慢的,因为所有顾客都在争相购买同一家商店。如果您可以将该过程分为两个阶段:1) 报价和 2) 仅在 (2) 上完成和应用交易,那么性能应该会好得多。或者,如果您购买 agent
编程,您将在消息级别为您自动定义事务边界。这是您可以考虑的一个示例:
(defn get-best-deal
"Returns the best deal for a given order with given stores (agent)"
[stores order]
;;
;; request for quotation from 1000 stores (in parallel)
;;
(doseq [store stores]
(send store get-quote order))
;;
;; wait for reply, up to 0.5s
;;
(apply await-for 500 stores)
;;
;; sort and find the best store
;;
(when-let [best-store (->> stores
(filter (fn [store] (get-in @store [:quotes order])))
(sort-by (fn [store] (->> (get-in @store [:quotes order])
vals
(reduce +))))
first)]
{:best-store best-store
:invoice-id (do
;; execute the order
(send best-store fulfill order)
;; wait for the transaction to complete
(await best-store)
;; get an invoice id
(get-in @best-store [:invoices order]))}))
并从 1,000 家商店中找到 100 种产品的 100 个订单(总共 289 个订单项)的最优惠价格:
(->> orders
(pmap (partial get-best-deal stores))
(filter :invoice-id)
count
time)
;; => 57
;; "Elapsed time: 312.002328 msecs"
示例业务逻辑:
(defn get-quote
"issue a quote by checking inventory"
[store {:keys [order-items] :as order}]
(if-let [quote (->> order-items
(reduce reduce-inventory
{:store store
:quote nil})
:quote)]
;; has inventory to generate a quote
(assoc-in store [:quotes order] quote)
;; no inventory
(update store :quotes dissoc order)))
(defn fulfill
"fulfill an order if previuosly quoted"
[store order]
(if-let [quote (get-in store [:quotes order])]
;; check inventory again and generate invoice
(let [[invoice inventory'] (check-inventory-and-generate-invoice store order)]
(cond-> store
invoice (->
;; register invoice
(assoc-in [:invoices order] invoice)
;; invalidate the quote
(update :quotes dissoc order)
;; update inventory
(assoc :inventory inventory'))))
;; not quoted before
store))