在 clojure 中操作包含 ref 集合的原子
manipulating an atom containing a ref collection in clojure
我有一个应用程序应该在客户指定的预算范围内为其预订航班。因此,我有客户数据和可用航班数据。然后我按如下方式在 Clojure 中开发解决方案。
首先,我创建一个航班原子:
(def flights
(atom []))
然后我创建了一个函数来将航班初始化为一个包含引用集合的原子。在这里,我传递了包含在 post.
下方的航班数据
(defn initialize-flights [initial-flights]
(reset! flights (map ref initial-flights)))
然后我通过 process-customers 函数处理客户,如下所示。这就是真正令人困惑的地方。
(defn process-customers [customers]
(doseq [customer1 (partitionCustomerInput N-THREADS customers)]
(doseq [customer2 customer1]
(swap! flights
(fn [flights_collection]
(if-let [updated-flight (process-customer flights_collection customer2)]
(assoc flights (:id updated-flight) updated-flight)
flights_collection)))))
(reset! finished-processing? true))
在流程客户内部,我将航班收集传递给流程客户(注意流程客户是流程客户的辅助函数,它们不是同一个函数)。 Flights-collection 此时是航班参考流程的集合-客户应该在列表中进行搜索,如果客户符合其中的航班资格,它会使用预订功能来编辑航班。我应该如何将航班收集传递给流程客户?实际上,流程客户不会搜索航班参考,也不会更改航班参考?
下面是 process-customer 函数及其辅助函数。
(defn- process-customer [flights customer]
"Try to book a flight from `flights` for `customer`, returning the updated
flight if found, or nil if no suitable flight was found."
(if-let [{:keys [flight price]} (find-flight flights customer)]
(let [updated-flight (book flight price (:seats customer))]
(log "Customer" (:id customer) "booked" (:seats customer)
"seats on flight" (:id updated-flight) "at $" price " (< budget of $"
(:budget customer) ").")
updated-flight)
(do
(log "Customer" (:id customer) "did not find a flight.")
nil)))
(defn filter-pricing-with-n-seats [pricing seats]
"Get `pricing` for which there are at least `seats` empty seats available."
(filter #(>= (second %) seats) pricing))
(defn lowest-available-price [flight seats]
"Returns the lowest price in `flight` for which at least `seats` empty seats
are available, or nil if none found."
(-> (:pricing flight) ; [[price available taken]]
(filter-pricing-with-n-seats seats)
(sort-pricing)
(first) ; [price available taken]
(first))) ; price
(defn- find-flight [flights customer]
"Find a flight in `flights` that is on the route and within the budget of
`customer`. If a flight was found, returns {:flight flight :price price},
else returns nil."
(let [{:keys [_id from to seats budget]}
customer
flights-and-prices
; flights that are on the route and within budget, and their price
(for [f flights
:when (and (= (:from f) from) (= (:to f) to))
:let [lowest-price (lowest-available-price f seats)]
:when (and (some? lowest-price) (<= lowest-price budget))]
{:flight f :price lowest-price})
cheapest-flight-and-price
(first (sort-by :price flights-and-prices))]
cheapest-flight-and-price))
(defn- book [flight price seats]
"Updates `flight` to book `seats` at `price`."
(update flight :pricing
(fn [pricing]
(for [[p a t] pricing]
(if (= p price)
[p (- a seats) (+ t seats)]
[p a t])))))
(def finished-processing?
"Set to true once all customers have been processed, so that sales process
can end."
(atom false))
(defn partitionCustomerInput
[threads customers]
(let [partitions (partition-all
(Math/ceil (/ (count customers) threads)) customers)]
partitions))
下面是主要功能。它初始化航班并启动客户处理
(defn main []
(initialize-flights input/flights)
(let [f1 (future (time (process-customers input/customers)))
@f1
)
(println "Flights:")
(print-flights (map deref @flights)))
(main)
(shutdown-agents)
以下是客户和航班合集。
(def flights
[{:id 0
:from "BRU" :to "ATL"
:carrier "Delta"
:pricing [[600 150 0] ; price; # seats available at that price; # seats taken at that price
[650 50 0]
[700 50 0]
[800 50 0]]}
{:id 1
:from "BRU" :to "LON"
:carrier "Brussels Airlines"
:pricing [[300 150 0]
[350 50 0]
[370 20 0]
[380 30 0]]}
{:id 2
:from "BRU" :to "LON"
:carrier "Brussels Airlines"
:pricing [[250 100 0]
[300 50 0]]}
{:id 3
:from "BRU" :to "MAD"
:carrier "Brussels Airlines"
:pricing [[200 150 0]
[250 50 0]
[300 100 0]]}
{:id 4
:from "BRU" :to "MAD"
:carrier "Iberia"
:pricing [[250 150 0]
[300 50 0]]}])
(def customers
[{:id 0 :from "BRU" :to "ATL" :seats 5 :budget 700}
{:id 1 :from "BRU" :to "ATL" :seats 5 :budget 550}
{:id 2 :from "BRU" :to "LON" :seats 6 :budget 270}
{:id 3 :from "BRU" :to "ATL" :seats 4 :budget 600}
{:id 4 :from "BRU" :to "LON" :seats 3 :budget 270}
{:id 5 :from "BRU" :to "LON" :seats 9 :budget 250}
{:id 6 :from "BRU" :to "MAD" :seats 5 :budget 200}
{:id 7 :from "BRU" :to "MAD" :seats 9 :budget 150}
{:id 8 :from "BRU" :to "LON" :seats 5 :budget 250}
{:id 9 :from "BRU" :to "ATL" :seats 4 :budget 500}
{:id 10 :from "BRU" :to "MAD" :seats 1 :budget 180}
{:id 11 :from "BRU" :to "LON" :seats 2 :budget 320}
{:id 12 :from "BRU" :to "ATL" :seats 3 :budget 850}
{:id 13 :from "BRU" :to "ATL" :seats 4 :budget 200}])
另外,请注意,我想在此实现中使用 refs 来更改航班,因为 ref 支持协调读取和写入以自动更改航班。我的目标是为这个应用制定一个高度并行的解决方案,不能容忍冲突。
我认为您需要一个 ref 而不是顶层的 atom。看来您需要协调对个别航班的更改和对航班列表的更改。如果一个线程正在修改航班而另一个线程将其从列表中删除怎么办?您的 process-customer
副作用必须在 (dosync)
.
内完成
性能方面,它应该没问题,因为如果你不在事务中修改你的航班列表引用,它不会导致其他事务改变它重试。
另一个原因是因为您违反了 swap!
的一个非常重要的规则。传递给 swap!
的函数必须没有副作用,因为它可以被 STM 重试。更改 ref 是副作用,可能会导致难以理解的错误。
所以我会做类似的事情
(def flights
(ref [(ref {:id "flight-1"})
(ref {:id "flight-2"})]))
;; Run a bunch of these in their own threads...
(doseq [customer partitioned-customers]
(dosync (process-customer customer flights)))
然后您可以使用 alter
、commute
和 ensure
微调 process-customer 以最大化并发性和最小化重试。
希望这对您有所帮助,祝您好运!
我有一个应用程序应该在客户指定的预算范围内为其预订航班。因此,我有客户数据和可用航班数据。然后我按如下方式在 Clojure 中开发解决方案。
首先,我创建一个航班原子:
(def flights
(atom []))
然后我创建了一个函数来将航班初始化为一个包含引用集合的原子。在这里,我传递了包含在 post.
下方的航班数据(defn initialize-flights [initial-flights]
(reset! flights (map ref initial-flights)))
然后我通过 process-customers 函数处理客户,如下所示。这就是真正令人困惑的地方。
(defn process-customers [customers]
(doseq [customer1 (partitionCustomerInput N-THREADS customers)]
(doseq [customer2 customer1]
(swap! flights
(fn [flights_collection]
(if-let [updated-flight (process-customer flights_collection customer2)]
(assoc flights (:id updated-flight) updated-flight)
flights_collection)))))
(reset! finished-processing? true))
在流程客户内部,我将航班收集传递给流程客户(注意流程客户是流程客户的辅助函数,它们不是同一个函数)。 Flights-collection 此时是航班参考流程的集合-客户应该在列表中进行搜索,如果客户符合其中的航班资格,它会使用预订功能来编辑航班。我应该如何将航班收集传递给流程客户?实际上,流程客户不会搜索航班参考,也不会更改航班参考?
下面是 process-customer 函数及其辅助函数。
(defn- process-customer [flights customer]
"Try to book a flight from `flights` for `customer`, returning the updated
flight if found, or nil if no suitable flight was found."
(if-let [{:keys [flight price]} (find-flight flights customer)]
(let [updated-flight (book flight price (:seats customer))]
(log "Customer" (:id customer) "booked" (:seats customer)
"seats on flight" (:id updated-flight) "at $" price " (< budget of $"
(:budget customer) ").")
updated-flight)
(do
(log "Customer" (:id customer) "did not find a flight.")
nil)))
(defn filter-pricing-with-n-seats [pricing seats]
"Get `pricing` for which there are at least `seats` empty seats available."
(filter #(>= (second %) seats) pricing))
(defn lowest-available-price [flight seats]
"Returns the lowest price in `flight` for which at least `seats` empty seats
are available, or nil if none found."
(-> (:pricing flight) ; [[price available taken]]
(filter-pricing-with-n-seats seats)
(sort-pricing)
(first) ; [price available taken]
(first))) ; price
(defn- find-flight [flights customer]
"Find a flight in `flights` that is on the route and within the budget of
`customer`. If a flight was found, returns {:flight flight :price price},
else returns nil."
(let [{:keys [_id from to seats budget]}
customer
flights-and-prices
; flights that are on the route and within budget, and their price
(for [f flights
:when (and (= (:from f) from) (= (:to f) to))
:let [lowest-price (lowest-available-price f seats)]
:when (and (some? lowest-price) (<= lowest-price budget))]
{:flight f :price lowest-price})
cheapest-flight-and-price
(first (sort-by :price flights-and-prices))]
cheapest-flight-and-price))
(defn- book [flight price seats]
"Updates `flight` to book `seats` at `price`."
(update flight :pricing
(fn [pricing]
(for [[p a t] pricing]
(if (= p price)
[p (- a seats) (+ t seats)]
[p a t])))))
(def finished-processing?
"Set to true once all customers have been processed, so that sales process
can end."
(atom false))
(defn partitionCustomerInput
[threads customers]
(let [partitions (partition-all
(Math/ceil (/ (count customers) threads)) customers)]
partitions))
下面是主要功能。它初始化航班并启动客户处理
(defn main []
(initialize-flights input/flights)
(let [f1 (future (time (process-customers input/customers)))
@f1
)
(println "Flights:")
(print-flights (map deref @flights)))
(main)
(shutdown-agents)
以下是客户和航班合集。
(def flights
[{:id 0
:from "BRU" :to "ATL"
:carrier "Delta"
:pricing [[600 150 0] ; price; # seats available at that price; # seats taken at that price
[650 50 0]
[700 50 0]
[800 50 0]]}
{:id 1
:from "BRU" :to "LON"
:carrier "Brussels Airlines"
:pricing [[300 150 0]
[350 50 0]
[370 20 0]
[380 30 0]]}
{:id 2
:from "BRU" :to "LON"
:carrier "Brussels Airlines"
:pricing [[250 100 0]
[300 50 0]]}
{:id 3
:from "BRU" :to "MAD"
:carrier "Brussels Airlines"
:pricing [[200 150 0]
[250 50 0]
[300 100 0]]}
{:id 4
:from "BRU" :to "MAD"
:carrier "Iberia"
:pricing [[250 150 0]
[300 50 0]]}])
(def customers
[{:id 0 :from "BRU" :to "ATL" :seats 5 :budget 700}
{:id 1 :from "BRU" :to "ATL" :seats 5 :budget 550}
{:id 2 :from "BRU" :to "LON" :seats 6 :budget 270}
{:id 3 :from "BRU" :to "ATL" :seats 4 :budget 600}
{:id 4 :from "BRU" :to "LON" :seats 3 :budget 270}
{:id 5 :from "BRU" :to "LON" :seats 9 :budget 250}
{:id 6 :from "BRU" :to "MAD" :seats 5 :budget 200}
{:id 7 :from "BRU" :to "MAD" :seats 9 :budget 150}
{:id 8 :from "BRU" :to "LON" :seats 5 :budget 250}
{:id 9 :from "BRU" :to "ATL" :seats 4 :budget 500}
{:id 10 :from "BRU" :to "MAD" :seats 1 :budget 180}
{:id 11 :from "BRU" :to "LON" :seats 2 :budget 320}
{:id 12 :from "BRU" :to "ATL" :seats 3 :budget 850}
{:id 13 :from "BRU" :to "ATL" :seats 4 :budget 200}])
另外,请注意,我想在此实现中使用 refs 来更改航班,因为 ref 支持协调读取和写入以自动更改航班。我的目标是为这个应用制定一个高度并行的解决方案,不能容忍冲突。
我认为您需要一个 ref 而不是顶层的 atom。看来您需要协调对个别航班的更改和对航班列表的更改。如果一个线程正在修改航班而另一个线程将其从列表中删除怎么办?您的 process-customer
副作用必须在 (dosync)
.
性能方面,它应该没问题,因为如果你不在事务中修改你的航班列表引用,它不会导致其他事务改变它重试。
另一个原因是因为您违反了 swap!
的一个非常重要的规则。传递给 swap!
的函数必须没有副作用,因为它可以被 STM 重试。更改 ref 是副作用,可能会导致难以理解的错误。
所以我会做类似的事情
(def flights
(ref [(ref {:id "flight-1"})
(ref {:id "flight-2"})]))
;; Run a bunch of these in their own threads...
(doseq [customer partitioned-customers]
(dosync (process-customer customer flights)))
然后您可以使用 alter
、commute
和 ensure
微调 process-customer 以最大化并发性和最小化重试。
希望这对您有所帮助,祝您好运!