具有惰性序列和 SSE 的有状态服务——如何进行容错分配?
stateful service with lazy sequences and SSE -- how to distribute with fault tolerance?
我编写了一个 Web 服务来生成 Pi 的估计值,使用 Clojure 中的惰性序列和各种无限级数公式(Euler、Leibniz)。 Clojure 服务通过 Server-Sent Events 通道发送这些估计。当前 HTML/JS 视图正在使用 Vue.js 来使用 SSE 事件并显示它们。
只要 SSE 通道的连接未关闭,它作为具有单个节点的服务就可以很好地工作。但截至目前,如果连接关闭或服务终止,它不会坚持或备份减少的状态(无限系列中的位置)以从故障中恢复。此外,由于状态包含在服务的本地内存中(在 Clojure 序列值中),因此没有水平可扩展性,例如,如果长期内存状态存在于 Redis 中,就会存在这种情况。在这种情况下,仅添加新节点不会提供实际划分工作的方法——它只会复制相同的系列。使用 Redis 卸载长期内存状态是我习惯于使用无状态 Web 服务的设置,以简化水平扩展和容错策略。
在这种有状态的情况下,对于如何使用可以并行处理序列项的分布式多节点解决方案来扩展 Clojure 服务,我有点不知所措。也许可以有一个调度 "master" 服务将序列范围委托给不同的节点,并发地从节点接收结果(通过 Redis pub/sub),以数学方式聚合它们并为视图生成结果 SSE 流?在那种情况下,主服务将使用间隔大约一千的无限系列数字来产生范围边界,并行节点可以使用它来初始化非无限 Clojure 序列(可能仍然是惰性的)?当然,在这种情况下,我需要在它们进入时标记哪些序列范围是完整的,并在处理范围期间节点故障的情况下使用重试策略。
我正在研究 Kubernetes Stateful Sets 以熟悉有状态服务的部署模式,尽管我还没有遇到适合这个特定问题的模式或解决方案。如果这是一个无状态服务,Kubernetes 解决方案会很明显,但是有状态方法让我在 Kubernetes 环境中一片空白。
任何人都可以为我指明此处架构的良好方向吗?假设我确实希望将系列术语的状态封装在 Clojure 惰性序列中(即,在本地服务内存中),我的工作划分策略是否正确?
这里是单节点Clojure服务的相关代码:
(ns server-sent-events.service
(:require [io.pedestal.http :as http]
[io.pedestal.http.sse :as sse]
[io.pedestal.http.route :as route]
[io.pedestal.http.route.definition :refer [defroutes]]
[ring.util.response :as ring-resp]
[clojure.core.async :as async]
)
)
(defn seq-of-terms
[func]
(map func (iterate (partial + 1) 0))
)
(defn euler-term [n]
(let [current (+ n 1)] (/ 6.0 (* current current)))
)
; The following returns a lazy list representing iterable sums that estimate pi
; according to the Euler series for increasing amounts of terms in the series.
; Sample usage: (take 100 euler-reductions)
(def euler-reductions
(map (fn [sum] (Math/sqrt sum)) (reductions + (seq-of-terms euler-term) ))
)
(defn leibniz-term [n] ; starts at zero
(let [
oddnum (+ (* 2.0 n) 1.0)
signfactor (- 1 (* 2 (mod n 2)))
]
(/ (* 4.0 signfactor) oddnum)
)
)
; The following returns a lazy list representing iterable sums that estimate pi
; according to the Leibniz series for increasing amounts of terms in the series.
; Sample usage: (take 100 leibniz-reductions)
(def leibniz-reductions (reductions + (seq-of-terms leibniz-term)))
(defn send-result
[event-ch count-num rdcts]
(doseq [item rdcts]
(Thread/sleep 150) ; we must use a naive throttle here to prevent an overflow on the core.async CSP channel, event-ch
(async/put! event-ch (str item))
)
)
(defn sse-euler-stream-ready
"Start to send estimates to the client according to the Euler series"
[event-ch ctx]
;; The context is passed into this function.
(let
[
{:keys [request response-channel]} ctx
lazy-list euler-reductions
]
(send-result event-ch 10 lazy-list)
)
)
(defn sse-leibniz-stream-ready
"Start to send estimates to the client according to the Leibniz series"
[event-ch ctx]
(let
[
{:keys [request response-channel]} ctx
lazy-list leibniz-reductions
]
(send-result event-ch 10 lazy-list)
)
)
;; Wire root URL to sse event stream
;; with custom event-id setting
(defroutes routes
[[["/" {:get [::send-result-euler (sse/start-event-stream sse-euler-stream-ready)]}
["/euler" {:get [::send-result
(sse/start-event-stream sse-euler-stream-ready)]}]
["/leibniz" {:get [::send-result-leibniz
(sse/start-event-stream sse-leibniz-stream-ready)]}]
]]])
(def url-for (route/url-for-routes routes))
(def service {:env :prod
::http/routes routes
;; Root for resource interceptor that is available by default.
::http/resource-path "/public"
;; Either :jetty or :tomcat (see comments in project.clj
;; to enable Tomcat)
::http/type :jetty
::http/port 8080
;;::http/allowed-origins ["http://127.0.0.1:8081"]
}
)
完整代码位于 https://github.com/wclark-aburra-code/pi-service。包含内联 Vue.js 代码,它使用 SSE 流。
如果只是为了缩放,我觉得你不需要坚持什么。您所需要的只是一个调度 "master"(可能是客户端本身)以从多个后端请求分块序列并重新组合它们以按正确的顺序交付。
使用core.async,一个调度master可以这样实现:
(let [batch-ch (async/chan)
out-ch (async/chan)]
;; request for 100 batches (or infinite)
(async/onto-chan batch-ch (range 100))
;; consume the result by pushing it back to the sse channel
(async/go-loop []
(when-let [res (async/<! out-ch)]
(log/info ::result res)
(recur)))
;;
;; take each batch number from batch-ch and dispatch it to the backend
;; in parallel. You would also add an exception handler in here.
;;
(async/pipeline-async
;; parallelism
32
;; output
out-ch
;; invoke backend service, this should return immediately
(fn [batch ch]
(let [batch-sz 1000]
(async/go
(let [start (* batch batch-sz)
end (-> batch inc (* batch-sz))]
(log/info ::fetching-from-service start end)
;; simulate a slow service
(async/<! (async/timeout 1000))
;; push the result back to the pipeline and close the channel
;; (here I just return the term itself)
(async/onto-chan ch (range start end))))))
;; input ;;
batch-ch))
我编写了一个 Web 服务来生成 Pi 的估计值,使用 Clojure 中的惰性序列和各种无限级数公式(Euler、Leibniz)。 Clojure 服务通过 Server-Sent Events 通道发送这些估计。当前 HTML/JS 视图正在使用 Vue.js 来使用 SSE 事件并显示它们。
只要 SSE 通道的连接未关闭,它作为具有单个节点的服务就可以很好地工作。但截至目前,如果连接关闭或服务终止,它不会坚持或备份减少的状态(无限系列中的位置)以从故障中恢复。此外,由于状态包含在服务的本地内存中(在 Clojure 序列值中),因此没有水平可扩展性,例如,如果长期内存状态存在于 Redis 中,就会存在这种情况。在这种情况下,仅添加新节点不会提供实际划分工作的方法——它只会复制相同的系列。使用 Redis 卸载长期内存状态是我习惯于使用无状态 Web 服务的设置,以简化水平扩展和容错策略。
在这种有状态的情况下,对于如何使用可以并行处理序列项的分布式多节点解决方案来扩展 Clojure 服务,我有点不知所措。也许可以有一个调度 "master" 服务将序列范围委托给不同的节点,并发地从节点接收结果(通过 Redis pub/sub),以数学方式聚合它们并为视图生成结果 SSE 流?在那种情况下,主服务将使用间隔大约一千的无限系列数字来产生范围边界,并行节点可以使用它来初始化非无限 Clojure 序列(可能仍然是惰性的)?当然,在这种情况下,我需要在它们进入时标记哪些序列范围是完整的,并在处理范围期间节点故障的情况下使用重试策略。
我正在研究 Kubernetes Stateful Sets 以熟悉有状态服务的部署模式,尽管我还没有遇到适合这个特定问题的模式或解决方案。如果这是一个无状态服务,Kubernetes 解决方案会很明显,但是有状态方法让我在 Kubernetes 环境中一片空白。
任何人都可以为我指明此处架构的良好方向吗?假设我确实希望将系列术语的状态封装在 Clojure 惰性序列中(即,在本地服务内存中),我的工作划分策略是否正确?
这里是单节点Clojure服务的相关代码:
(ns server-sent-events.service
(:require [io.pedestal.http :as http]
[io.pedestal.http.sse :as sse]
[io.pedestal.http.route :as route]
[io.pedestal.http.route.definition :refer [defroutes]]
[ring.util.response :as ring-resp]
[clojure.core.async :as async]
)
)
(defn seq-of-terms
[func]
(map func (iterate (partial + 1) 0))
)
(defn euler-term [n]
(let [current (+ n 1)] (/ 6.0 (* current current)))
)
; The following returns a lazy list representing iterable sums that estimate pi
; according to the Euler series for increasing amounts of terms in the series.
; Sample usage: (take 100 euler-reductions)
(def euler-reductions
(map (fn [sum] (Math/sqrt sum)) (reductions + (seq-of-terms euler-term) ))
)
(defn leibniz-term [n] ; starts at zero
(let [
oddnum (+ (* 2.0 n) 1.0)
signfactor (- 1 (* 2 (mod n 2)))
]
(/ (* 4.0 signfactor) oddnum)
)
)
; The following returns a lazy list representing iterable sums that estimate pi
; according to the Leibniz series for increasing amounts of terms in the series.
; Sample usage: (take 100 leibniz-reductions)
(def leibniz-reductions (reductions + (seq-of-terms leibniz-term)))
(defn send-result
[event-ch count-num rdcts]
(doseq [item rdcts]
(Thread/sleep 150) ; we must use a naive throttle here to prevent an overflow on the core.async CSP channel, event-ch
(async/put! event-ch (str item))
)
)
(defn sse-euler-stream-ready
"Start to send estimates to the client according to the Euler series"
[event-ch ctx]
;; The context is passed into this function.
(let
[
{:keys [request response-channel]} ctx
lazy-list euler-reductions
]
(send-result event-ch 10 lazy-list)
)
)
(defn sse-leibniz-stream-ready
"Start to send estimates to the client according to the Leibniz series"
[event-ch ctx]
(let
[
{:keys [request response-channel]} ctx
lazy-list leibniz-reductions
]
(send-result event-ch 10 lazy-list)
)
)
;; Wire root URL to sse event stream
;; with custom event-id setting
(defroutes routes
[[["/" {:get [::send-result-euler (sse/start-event-stream sse-euler-stream-ready)]}
["/euler" {:get [::send-result
(sse/start-event-stream sse-euler-stream-ready)]}]
["/leibniz" {:get [::send-result-leibniz
(sse/start-event-stream sse-leibniz-stream-ready)]}]
]]])
(def url-for (route/url-for-routes routes))
(def service {:env :prod
::http/routes routes
;; Root for resource interceptor that is available by default.
::http/resource-path "/public"
;; Either :jetty or :tomcat (see comments in project.clj
;; to enable Tomcat)
::http/type :jetty
::http/port 8080
;;::http/allowed-origins ["http://127.0.0.1:8081"]
}
)
完整代码位于 https://github.com/wclark-aburra-code/pi-service。包含内联 Vue.js 代码,它使用 SSE 流。
如果只是为了缩放,我觉得你不需要坚持什么。您所需要的只是一个调度 "master"(可能是客户端本身)以从多个后端请求分块序列并重新组合它们以按正确的顺序交付。
使用core.async,一个调度master可以这样实现:
(let [batch-ch (async/chan)
out-ch (async/chan)]
;; request for 100 batches (or infinite)
(async/onto-chan batch-ch (range 100))
;; consume the result by pushing it back to the sse channel
(async/go-loop []
(when-let [res (async/<! out-ch)]
(log/info ::result res)
(recur)))
;;
;; take each batch number from batch-ch and dispatch it to the backend
;; in parallel. You would also add an exception handler in here.
;;
(async/pipeline-async
;; parallelism
32
;; output
out-ch
;; invoke backend service, this should return immediately
(fn [batch ch]
(let [batch-sz 1000]
(async/go
(let [start (* batch batch-sz)
end (-> batch inc (* batch-sz))]
(log/info ::fetching-from-service start end)
;; simulate a slow service
(async/<! (async/timeout 1000))
;; push the result back to the pipeline and close the channel
;; (here I just return the term itself)
(async/onto-chan ch (range start end))))))
;; input ;;
batch-ch))