如何从 compojure API 流式传输大型 CSV 响应,以便整个响应不会立即保存在内存中?

How to stream a large CSV response from a compojure API so that the whole response is not held in memory at once?

我是 compojure 的新手,但到目前为止一直很喜欢使用它。我是 目前在我的 API 端点之一中遇到问题 来自数据库的大型 CSV 文件,然后将其作为响应正文传递。

我似乎遇到的问题是整个 CSV 文件都被保留了下来 在内存中,然后在 API 中导致内存不足错误。是什么 处理和生成它的最佳方法,最好是 gzip 文件?可能吗 流式传输响应以便一次 return 编辑几千行?什么时候 我return一个JSON response body 一样的数据,没有问题returning 这个。

这是我用于 return 的当前代码:

(defn complete
  "Returns metrics for each completed benchmark instance"
  [db-client response-format]
  (let [benchmarks  (completed-benchmark-metrics {} db-client)]
    (case response-format
      :json  (json-grouped-output field-mappings benchmarks)
      :csv   (csv-output benchmarks))))

(defn csv-output [data-seq]
  (let [header (map name (keys (first data-seq)))
        out    (java.io.StringWriter.)
        write  #(csv/write-csv out (list %))]
    (write header)
    (dorun (map (comp write vals) data-seq))
    (.toString out)))

data-seq 是从数据库中 return 得到的结果,我认为这是一个 惰性序列。我正在使用 yesql 执行数据库调用。

这是我为这个 API 端点编写的资源:

(defresource results-complete [db]
  :available-media-types  ["application/json" "text/csv"]
  :allowed-methods        [:get]
  :handle-ok              (fn [request]
                            (let [response-format (keyword (get-in request [:request :params :format] :json))
                                  disposition     (str "attachment; filename=\"nucleotides_benchmark_metrics." (name response-format) "\"")
                                  response        {:headers {"Content-Type" (content-types response-format)
                                                             "Content-Disposition" disposition}
                                                   :body    (results/complete db response-format)}]
                              (ring-response response))))

你的csv-output函数完全实现了数据集,转成字符串。要延迟流式传输数据,您需要 return 除了具体数据类型(如字符串)之外的其他内容。 This suggests ring supports returning a stream, that can be lazily realised by Jetty. The answer to this question 可能有用。

我也在努力处理大型 csv 文件的流式传输。我的解决方案是使用 httpkit-channeldata-seq 的每一行流式传输到客户端,然后关闭通道。我的解决方案是这样的:

[org.httpkit.server :refer :all]

(fn handler [req]
    (with-channel req channel (let [header "your$header"
                                    data-seq ["your$seq-data"]]
                                    (doseq [line (cons header data-seq)]
                                       (send! channel
                                              {:status  200
                                              :headers {"Content-Type" "text/csv"}
                                              :body    (str line "\n")}
                                              false))
                                    (close channel))))

多亏了此线程中提供的所有建议,我才能够使用 piped-input-stream:

创建一个解决方案
(defn csv-output [data-seq]
  (let [headers     (map name (keys (first data-seq)))
        rows        (map vals data-seq)
        stream-csv  (fn [out] (csv/write-csv out (cons headers rows))
                              (.flush out))]
    (piped-input-stream #(stream-csv (io/make-writer % {})))))

这与我的解决方案不同,因为它不使用 dorun 实现序列,也不创建大型 String 对象。这改为异步写入 PipedInputStream 连接 as described by the documentation:

Create an input stream from a function that takes an output stream as its argument. The function will be executed in a separate thread. The stream will be automatically closed after the function finishes.