具有惰性序列和 SSE 的有状态服务——如何进行容错分配?

stateful service with lazy sequences and SSE -- how to distribute with fault tolerance?

我编写了一个 Web 服务来生成 Pi 的估计值,使用 Clojure 中的惰性序列和各种无限级数公式(Euler、Leibniz)。 Clojure 服务通过 Server-Sent Events 通道发送这些估计。当前 HTML/JS 视图正在使用 Vue.js 来使用 SSE 事件并显示它们。

只要 SSE 通道的连接未关闭,它作为具有单个节点的服务就可以很好地工作。但截至目前,如果连接关闭或服务终止,它不会坚持或备份减少的状态(无限系列中的位置)以从故障中恢复。此外,由于状态包含在服务的本地内存中(在 Clojure 序列值中),因此没有水平可扩展性,例如,如果长期内存状态存在于 Redis 中,就会存在这种情况。在这种情况下,仅添加新节点不会提供实际划分工作的方法——它只会复制相同的系列。使用 Redis 卸载长期内存状态是我习惯于使用无状态 Web 服务的设置,以简化水平扩展和容错策略。

在这种有状态的情况下,对于如何使用可以并行处理序列项的分布式多节点解决方案来扩展 Clojure 服务,我有点不知所措。也许可以有一个调度 "master" 服务将序列范围委托给不同的节点,并发地从节点接收结果(通过 Redis pub/sub),以数学方式聚合它们并为视图生成结果 SSE 流?在那种情况下,主服务将使用间隔大约一千的无限系列数字来产生范围边界,并行节点可以使用它来初始化非无限 Clojure 序列(可能仍然是惰性的)?当然,在这种情况下,我需要在它们进入时标记哪些序列范围是完整的,并在处理范围期间节点故障的情况下使用重试策略。

我正在研究 Kubernetes Stateful Sets 以熟悉有状态服务的部署模式,尽管我还没有遇到适合这个特定问题的模式或解决方案。如果这是一个无状态服务,Kubernetes 解决方案会很明显,但是有状态方法让我在 Kubernetes 环境中一片空白。

任何人都可以为我指明此处架构的良好方向吗?假设我确实希望将系列术语的状态封装在 Clojure 惰性序列中(即,在本地服务内存中),我的工作划分策略是否正确?

这里是单节点Clojure服务的相关代码:

(ns server-sent-events.service
  (:require [io.pedestal.http :as http]
            [io.pedestal.http.sse :as sse]
            [io.pedestal.http.route :as route]
            [io.pedestal.http.route.definition :refer [defroutes]]
            [ring.util.response :as ring-resp]
            [clojure.core.async :as async]
  )
)

(defn seq-of-terms
   [func]
   (map func (iterate (partial + 1) 0))
)

(defn euler-term [n]
  (let [current (+ n 1)] (/ 6.0 (* current current)))
)

; The following returns a lazy list representing iterable sums that estimate pi
; according to the Euler series for increasing amounts of terms in the series.
; Sample usage: (take 100 euler-reductions)
(def euler-reductions
  (map (fn [sum] (Math/sqrt sum))  (reductions + (seq-of-terms euler-term) ))
)

(defn leibniz-term [n] ; starts at zero
   (let [
          oddnum (+ (* 2.0 n) 1.0)
          signfactor (- 1 (* 2 (mod n 2)))
        ]
        (/ (* 4.0 signfactor) oddnum)
  )
)

; The following returns a lazy list representing iterable sums that estimate pi
; according to the Leibniz series for increasing amounts of terms in the series.
; Sample usage: (take 100 leibniz-reductions)
(def leibniz-reductions (reductions + (seq-of-terms leibniz-term)))

(defn send-result
  [event-ch count-num rdcts]
  (doseq [item rdcts]
    (Thread/sleep 150) ; we must use a naive throttle here to prevent an overflow on the core.async CSP channel, event-ch
    (async/put! event-ch (str item))
  )
)

(defn sse-euler-stream-ready
  "Start to send estimates to the client according to the Euler series"
  [event-ch ctx]
  ;; The context is passed into this function.
  (let
    [
      {:keys [request response-channel]} ctx
      lazy-list euler-reductions
    ]
    (send-result event-ch 10 lazy-list)
  )
)

(defn sse-leibniz-stream-ready
  "Start to send estimates to the client according to the Leibniz series"
  [event-ch ctx]
  (let
    [
      {:keys [request response-channel]} ctx
      lazy-list leibniz-reductions
    ]
    (send-result event-ch 10 lazy-list)
  )
)


;; Wire root URL to sse event stream
;; with custom event-id setting
(defroutes routes
  [[["/" {:get [::send-result-euler (sse/start-event-stream sse-euler-stream-ready)]}
    ["/euler" {:get [::send-result
                    (sse/start-event-stream sse-euler-stream-ready)]}]
    ["/leibniz" {:get [::send-result-leibniz
                      (sse/start-event-stream sse-leibniz-stream-ready)]}]
    ]]])

(def url-for (route/url-for-routes routes))

(def service {:env :prod
              ::http/routes routes
              ;; Root for resource interceptor that is available by default.
              ::http/resource-path "/public"
              ;; Either :jetty or :tomcat (see comments in project.clj
              ;; to enable Tomcat)
              ::http/type :jetty
              ::http/port 8080
              ;;::http/allowed-origins ["http://127.0.0.1:8081"]
              }
)

完整代码位于 https://github.com/wclark-aburra-code/pi-service。包含内联 Vue.js 代码,它使用 SSE 流。

如果只是为了缩放,我觉得你不需要坚持什么。您所需要的只是一个调度 "master"(可能是客户端本身)以从多个后端请求分块序列并重新组合它们以按正确的顺序交付。

使用core.async,一个调度master可以这样实现:

(let [batch-ch (async/chan)
      out-ch   (async/chan)]

  ;; request for 100 batches (or infinite)
  (async/onto-chan batch-ch (range 100))
  ;; consume the result by pushing it back to the sse channel
  (async/go-loop []
    (when-let [res (async/<! out-ch)]
      (log/info ::result res)
      (recur)))

  ;;
  ;; take each batch number from batch-ch and dispatch it to the backend
  ;; in parallel. You would also add an exception handler in here.
  ;;
  (async/pipeline-async
   ;; parallelism
   32
   ;; output
   out-ch
   ;; invoke backend service, this should return immediately
   (fn [batch ch]
     (let [batch-sz 1000]
       (async/go
         (let [start (* batch batch-sz)
               end   (-> batch inc (* batch-sz))]
           (log/info ::fetching-from-service start end)
           ;; simulate a slow service
           (async/<! (async/timeout 1000))
           ;; push the result back to the pipeline and close the channel
           ;; (here I just return the term itself)
           (async/onto-chan ch (range start end))))))
   ;; input  ;;
   batch-ch))