将数据流式传输到 JVM 中的调用方

Streaming data to the caller in JVM

我有一个函数可以定期获取数据然后停止获取数据。此函数必须 return 定期向函数的调用者获取数据

  1. 当它得到时
  2. 一次拍摄

第二个是一个简单的实现,即您阻止调用者,获取所有数据,然后一次性发送。

但我想实现第一个(我想避免回调)。流是这里要用的东西吗?如果是这样,如何?如果没有,我如何 return something 调用者可以查询数据并在 return 发出没有更多数据的信号时停止?

注意:我在 JVM 生态系统上,具体来说是 clojure。我看过 clojure 库 core.async,它解决了此类使用通道的问题。但我在想是否还有其他可能看起来像这样的方式(假设可以使用流)。
Java 片段

//Function which will periodically fetch MyData until there is no data
public Stream<MyData> myFunction() {
...
}

myFunction().filter(myData -> myData.text.equals("foo"))

您可能需要查看 https://github.com/ReactiveX/RxJava and https://github.com/ReactiveX/RxClojure(似乎不再维护?)

也许你可以只使用 seq - 默认情况下它是惰性的(如 Stream),因此调用者可以决定何时拉入数据。当没有更多数据时 myFunction 可以简单地结束序列。在执行此操作时,您还将在 myFunction 中封装一些优化 - 例如批量获取数据以最小化往返。或者根据您的原始要求定期获取数据。

这是一个简单的实现:

(defn my-function []
  (let [batch 100]
    (->> (range)
         (map #(let [from (* batch %)
                     to   (+ from batch)]
                  (db-get from to)))
         ;; take while we have data from db-get
         (take-while identity)
         ;; returns as one single seq/Stream
         (apply concat))))

;; use it as a normal seq/Stream
(->> (my-function)
     (filter odd?))

其中 db-get 类似于:

(defn db-get [from to]
  ;; return first 1000 records only, i.e. returns nil to signal completion
  (when (< from 1000)
    ;; returns a range of records
    (range from to)))