如何将 IReduceInit 从 next.jdbc 调整为使用 cheshire 将 JSON 流式传输到使用 ring 的 HTTP 响应
How to adapt the IReduceInit from next.jdbc to stream JSON using cheshire to a HTTP response using ring
tl;dr 如何将 IReduceInit 转换为转换值的惰性序列
我有一个数据库查询,它产生一个相当大的数据集,用于在客户端进行实时旋转(百万或两行,25 个属性 - 对于现代笔记本电脑来说没问题)。
我的(简化的)堆栈是调用 clojure.jdbc 来获得(我认为是懒惰的)结果行序列。我可以通过 ring-json 中间件将它作为正文传递出去来序列化它。 ring-json 在堆上构建响应字符串时存在问题,但从 0.5.0 开始有一个选项可以流式传输响应。
我通过分析几个失败案例发现实际上 clojure.jdbc 在返回之前在内存中实现了整个结果集。没问题!我没有使用那个库中的 reducible-query
,而是决定迁移到新的 next.jdbc.
next.jdbc 中的关键操作是 plan
,其中 returns 一个 IReduceInit,我可以使用它来 运行 查询并获取结果集...
(into [] (map :cc_id) (jdbc/plan ds ["select cc_id from organisation where cc_id = '675192'"]))
["675192"]
然而,这实现了整个结果集,在上面的例子中,我会预先在内存中给我所有的 ID。对一个人来说不是问题,但我通常有很多人。
计划 IReduceInit 是一个我可以减少的东西,如果我给一个起始值,所以我可以在减少函数中进行输出...(thx @amalloy)
(reduce #(println (:cc_id %2)) [] (jdbc/plan ds ["select cc_id from organisation where cc_id = '675192'"]))
675192
nil
...但理想情况下,我想在对它们应用转换函数后将这个 IReduceInit 变成值的惰性序列,这样我就可以将它们与 ring-json 和 cheshire 一起使用。我没有看到任何明显的方法。
reduce
适用于 IReduceInit。 IReduceInit 需要一个初始值,该值是您在对其调用 .reduce 时指定的,但在使用 reduce 函数时则不需要;这解释了为什么您看到一个在工作,而另一个在工作。
但是,这不会为您提供惰性序列。 reduce
契约的一部分是它急切地消耗整个输入(我们将忽略 reduced
这不会改变任何有意义的东西)。您的问题是更普遍的动态范围问题的具体情况: JDBC 产生的序列在某些上下文中仅是 "valid" ,您需要在该上下文中进行所有处理,因此它可以偷懒吧。相反,您通常会彻底改变您的程序:不要将 return 值用作序列,而是向查询引擎传递一个函数并说 "please call this function with your results"。然后引擎确保数据在调用该函数时有效,一旦函数 returns 它就会清理数据。我不知道 jdbc.next,但对于较旧的 jdbc,您会为此使用 db-query-with-resultset
之类的东西。您将向它传递一些可以将字节添加到挂起的 HTTP 响应的函数,并且它会多次调用该函数。
这有点含糊,因为我不知道您使用的是什么 HTTP 处理程序,或者它用于非延迟处理流式响应的工具是什么,但这是您必须遵循的一般想法如果你想处理一个动态范围的资源:惰性不是一个选项。
令人沮丧。
为什么你不能用 JDBC 来做呢?没有任何 Clojure 层?
(let [resultset (.executeQuery connection "select ...")]
(loop
(when (.next resultset)
(let [row [(.getString resultset 1)
(.getString resultset 2)
...]])
(json/send row)
(recur)))
(json/end))
当然,使用 ResultSetMetaData,您可以自动将行生成到一个函数中,该函数可以处理返回的任何内容。
IReduceInit 使 JDBC 资源在 reduce 函数退出时结束。
这比 LazySeq 方法更可预测,后者可能永远不会释放 JDBC 资源。
您使用 BlockingQueue 和未来的任务来像这样填充该队列
(defn lazywalk-reducible
"walks the reducible in chunks of size n,
returns an iterable that permits access"
[n reducible]
(reify java.lang.Iterable
(iterator [_]
(let [bq (java.util.concurrent.ArrayBlockingQueue. n)
finished? (volatile! false)
traverser (future (reduce (fn [_ v] (.put bq v)) nil reducible)
(vreset! finished? true))]
(reify java.util.Iterator
(hasNext [_] (or (false? @finished?) (false? (.isEmpty bq))))
(next [_] (.take bq)))))))
如果生成迭代器但未遵循其结论,这当然会造成泄漏。
我没有仔细测试过,可能还有其他问题;但这种方法应该有效。
如果 Java Iterable 对您的用例来说不够好,您也可以将其具体化 clojure.lang.ISeq
;但是随后您开始遇到 HeadRetention 问题;以及如何处理对 Object first()
的调用,这是非常可行的,但我不想考虑太多
我的 lazy-seq 是个坏主意的原因有很多 - 即使我保证不会保持头脑,结果流期间的异常问题无疑会使 ResultSet 闲置 - 序列化会发生在可以清理的调用堆栈之外。
懒惰的需要是由不想在内存中实现整个结果的愿望驱动的,需要 seq 或其他 coll?是为了让中间件将其序列化...
因此,直接将IReduceInit做成JSONable,然后绕过中间件。如果在序列化过程中出现异常,控件将通过 next.jdbc 的 IReduceInit,然后可以进行有意义的清理。
;; reuse this body generator from my patch to ring.middleware.json directly, as the coll? check will fail
(defrecord JsonStreamingResponseBody [body options]
ring-protocols/StreamableResponseBody
(write-body-to-stream [_ _ output-stream]
(json/generate-stream body (io/writer output-stream) options)))
;; the year long yak is shaved in 8 lines by providing a custom serialiser for IReduceInits…
(extend-type IReduceInit
cheshire.generate/JSONable
(to-json [^IReduceInit results ^JsonGenerator jg]
(.writeStartArray jg)
(let [rf (fn [_ ^IPersistentMap m]
(cheshire.generate/encode-map m jg))]
(reduce rf nil results))
(.writeEndArray jg)))
;; at this point I can wrap the result from next.jdbc/plan with ->JsonStreamingResponseBody into the :body of the ring response and it will stream
编写这些功能仍然感觉工作量很大,适配器代码总是让我担心我缺少一种简单、惯用的方法。
tl;dr 如何将 IReduceInit 转换为转换值的惰性序列
我有一个数据库查询,它产生一个相当大的数据集,用于在客户端进行实时旋转(百万或两行,25 个属性 - 对于现代笔记本电脑来说没问题)。
我的(简化的)堆栈是调用 clojure.jdbc 来获得(我认为是懒惰的)结果行序列。我可以通过 ring-json 中间件将它作为正文传递出去来序列化它。 ring-json 在堆上构建响应字符串时存在问题,但从 0.5.0 开始有一个选项可以流式传输响应。
我通过分析几个失败案例发现实际上 clojure.jdbc 在返回之前在内存中实现了整个结果集。没问题!我没有使用那个库中的 reducible-query
,而是决定迁移到新的 next.jdbc.
next.jdbc 中的关键操作是 plan
,其中 returns 一个 IReduceInit,我可以使用它来 运行 查询并获取结果集...
(into [] (map :cc_id) (jdbc/plan ds ["select cc_id from organisation where cc_id = '675192'"]))
["675192"]
然而,这实现了整个结果集,在上面的例子中,我会预先在内存中给我所有的 ID。对一个人来说不是问题,但我通常有很多人。
计划 IReduceInit 是一个我可以减少的东西,如果我给一个起始值,所以我可以在减少函数中进行输出...(thx @amalloy)
(reduce #(println (:cc_id %2)) [] (jdbc/plan ds ["select cc_id from organisation where cc_id = '675192'"]))
675192
nil
...但理想情况下,我想在对它们应用转换函数后将这个 IReduceInit 变成值的惰性序列,这样我就可以将它们与 ring-json 和 cheshire 一起使用。我没有看到任何明显的方法。
reduce
适用于 IReduceInit。 IReduceInit 需要一个初始值,该值是您在对其调用 .reduce 时指定的,但在使用 reduce 函数时则不需要;这解释了为什么您看到一个在工作,而另一个在工作。
但是,这不会为您提供惰性序列。 reduce
契约的一部分是它急切地消耗整个输入(我们将忽略 reduced
这不会改变任何有意义的东西)。您的问题是更普遍的动态范围问题的具体情况: JDBC 产生的序列在某些上下文中仅是 "valid" ,您需要在该上下文中进行所有处理,因此它可以偷懒吧。相反,您通常会彻底改变您的程序:不要将 return 值用作序列,而是向查询引擎传递一个函数并说 "please call this function with your results"。然后引擎确保数据在调用该函数时有效,一旦函数 returns 它就会清理数据。我不知道 jdbc.next,但对于较旧的 jdbc,您会为此使用 db-query-with-resultset
之类的东西。您将向它传递一些可以将字节添加到挂起的 HTTP 响应的函数,并且它会多次调用该函数。
这有点含糊,因为我不知道您使用的是什么 HTTP 处理程序,或者它用于非延迟处理流式响应的工具是什么,但这是您必须遵循的一般想法如果你想处理一个动态范围的资源:惰性不是一个选项。
令人沮丧。
为什么你不能用 JDBC 来做呢?没有任何 Clojure 层?
(let [resultset (.executeQuery connection "select ...")]
(loop
(when (.next resultset)
(let [row [(.getString resultset 1)
(.getString resultset 2)
...]])
(json/send row)
(recur)))
(json/end))
当然,使用 ResultSetMetaData,您可以自动将行生成到一个函数中,该函数可以处理返回的任何内容。
IReduceInit 使 JDBC 资源在 reduce 函数退出时结束。 这比 LazySeq 方法更可预测,后者可能永远不会释放 JDBC 资源。
您使用 BlockingQueue 和未来的任务来像这样填充该队列
(defn lazywalk-reducible
"walks the reducible in chunks of size n,
returns an iterable that permits access"
[n reducible]
(reify java.lang.Iterable
(iterator [_]
(let [bq (java.util.concurrent.ArrayBlockingQueue. n)
finished? (volatile! false)
traverser (future (reduce (fn [_ v] (.put bq v)) nil reducible)
(vreset! finished? true))]
(reify java.util.Iterator
(hasNext [_] (or (false? @finished?) (false? (.isEmpty bq))))
(next [_] (.take bq)))))))
如果生成迭代器但未遵循其结论,这当然会造成泄漏。
我没有仔细测试过,可能还有其他问题;但这种方法应该有效。
如果 Java Iterable 对您的用例来说不够好,您也可以将其具体化 clojure.lang.ISeq
;但是随后您开始遇到 HeadRetention 问题;以及如何处理对 Object first()
的调用,这是非常可行的,但我不想考虑太多
我的 lazy-seq 是个坏主意的原因有很多 - 即使我保证不会保持头脑,结果流期间的异常问题无疑会使 ResultSet 闲置 - 序列化会发生在可以清理的调用堆栈之外。
懒惰的需要是由不想在内存中实现整个结果的愿望驱动的,需要 seq 或其他 coll?是为了让中间件将其序列化...
因此,直接将IReduceInit做成JSONable,然后绕过中间件。如果在序列化过程中出现异常,控件将通过 next.jdbc 的 IReduceInit,然后可以进行有意义的清理。
;; reuse this body generator from my patch to ring.middleware.json directly, as the coll? check will fail
(defrecord JsonStreamingResponseBody [body options]
ring-protocols/StreamableResponseBody
(write-body-to-stream [_ _ output-stream]
(json/generate-stream body (io/writer output-stream) options)))
;; the year long yak is shaved in 8 lines by providing a custom serialiser for IReduceInits…
(extend-type IReduceInit
cheshire.generate/JSONable
(to-json [^IReduceInit results ^JsonGenerator jg]
(.writeStartArray jg)
(let [rf (fn [_ ^IPersistentMap m]
(cheshire.generate/encode-map m jg))]
(reduce rf nil results))
(.writeEndArray jg)))
;; at this point I can wrap the result from next.jdbc/plan with ->JsonStreamingResponseBody into the :body of the ring response and it will stream
编写这些功能仍然感觉工作量很大,适配器代码总是让我担心我缺少一种简单、惯用的方法。