将数据流式传输到 JVM 中的调用方
Streaming data to the caller in JVM
我有一个函数可以定期获取数据然后停止获取数据。此函数必须 return 定期向函数的调用者获取数据
- 当它得到时
- 一次拍摄
第二个是一个简单的实现,即您阻止调用者,获取所有数据,然后一次性发送。
但我想实现第一个(我想避免回调)。流是这里要用的东西吗?如果是这样,如何?如果没有,我如何 return something
调用者可以查询数据并在 return 发出没有更多数据的信号时停止?
注意:我在 JVM 生态系统上,具体来说是 clojure。我看过 clojure 库 core.async
,它解决了此类使用通道的问题。但我在想是否还有其他可能看起来像这样的方式(假设可以使用流)。
Java 片段
//Function which will periodically fetch MyData until there is no data
public Stream<MyData> myFunction() {
...
}
myFunction().filter(myData -> myData.text.equals("foo"))
您可能需要查看 https://github.com/ReactiveX/RxJava and https://github.com/ReactiveX/RxClojure(似乎不再维护?)
也许你可以只使用 seq
- 默认情况下它是惰性的(如 Stream),因此调用者可以决定何时拉入数据。当没有更多数据时 myFunction
可以简单地结束序列。在执行此操作时,您还将在 myFunction
中封装一些优化 - 例如批量获取数据以最小化往返。或者根据您的原始要求定期获取数据。
这是一个简单的实现:
(defn my-function []
(let [batch 100]
(->> (range)
(map #(let [from (* batch %)
to (+ from batch)]
(db-get from to)))
;; take while we have data from db-get
(take-while identity)
;; returns as one single seq/Stream
(apply concat))))
;; use it as a normal seq/Stream
(->> (my-function)
(filter odd?))
其中 db-get
类似于:
(defn db-get [from to]
;; return first 1000 records only, i.e. returns nil to signal completion
(when (< from 1000)
;; returns a range of records
(range from to)))
我有一个函数可以定期获取数据然后停止获取数据。此函数必须 return 定期向函数的调用者获取数据
- 当它得到时
- 一次拍摄
第二个是一个简单的实现,即您阻止调用者,获取所有数据,然后一次性发送。
但我想实现第一个(我想避免回调)。流是这里要用的东西吗?如果是这样,如何?如果没有,我如何 return something
调用者可以查询数据并在 return 发出没有更多数据的信号时停止?
注意:我在 JVM 生态系统上,具体来说是 clojure。我看过 clojure 库 core.async
,它解决了此类使用通道的问题。但我在想是否还有其他可能看起来像这样的方式(假设可以使用流)。
Java 片段
//Function which will periodically fetch MyData until there is no data
public Stream<MyData> myFunction() {
...
}
myFunction().filter(myData -> myData.text.equals("foo"))
您可能需要查看 https://github.com/ReactiveX/RxJava and https://github.com/ReactiveX/RxClojure(似乎不再维护?)
也许你可以只使用 seq
- 默认情况下它是惰性的(如 Stream),因此调用者可以决定何时拉入数据。当没有更多数据时 myFunction
可以简单地结束序列。在执行此操作时,您还将在 myFunction
中封装一些优化 - 例如批量获取数据以最小化往返。或者根据您的原始要求定期获取数据。
这是一个简单的实现:
(defn my-function []
(let [batch 100]
(->> (range)
(map #(let [from (* batch %)
to (+ from batch)]
(db-get from to)))
;; take while we have data from db-get
(take-while identity)
;; returns as one single seq/Stream
(apply concat))))
;; use it as a normal seq/Stream
(->> (my-function)
(filter odd?))
其中 db-get
类似于:
(defn db-get [from to]
;; return first 1000 records only, i.e. returns nil to signal completion
(when (< from 1000)
;; returns a range of records
(range from to)))