Clojure - 解析 Elasticsearch 查询响应并提取值

Clojure - Parsing Elasticsearch query response and extracting values

我正在寻找解析 Elasticsearch 查询响应并将其转换为我自己的格式。响应可能有嵌套的桶,嵌套的级别对于每个查询都是可变的。这是结果的简化版本:

{:bucket-aggregation
 {:buckets
  [{:key "outer_bucket"
    :bucket-aggregation
    {:buckets
     [{:key "inner_bucket_1"
       :bucket-aggregation
       {:buckets
        [{:key 1510657200000, :sum {:value 25}}
         {:key 1510660800000, :sum {:value 50}}]}}
      {:key "inner_bucket_2"
       :bucket-aggregation
       {:buckets
        [{:key 1510657200000, :sum {:value 30}}
         {:key 1510660800000, :sum {:value 35}}]}}
      {:key "inner_bucket_3"
       :bucket-aggregation
       {:buckets
        [{:key 1510657200000, :sum {:value 40}}
         {:key 1510660800000, :sum {:value 45}}]}}]}}]}}

我想将 :value 和 :key 提取到这样的结构中:

[{:key ["outer_bucket" "inner_bucket_1" 1510657200000], :value 25}
 {:key ["outer_bucket" "inner_bucket_1" 1510660800000], :value 50}
 {:key ["outer_bucket" "inner_bucket_2" 1510657200000], :value 30}
 {:key ["outer_bucket" "inner_bucket_2" 1510660800000], :value 35}
 {:key ["outer_bucket" "inner_bucket_3" 1510657200000], :value 40}
 {:key ["outer_bucket" "inner_bucket_3" 1510660800000], :value 45}]

关于我应该如何处理这件事有什么建议吗?

编辑:简化了所需的格式

如果你愿意添加库,你可以通过 specter:

; assume your data there is in `(def data ...)`
(use 'com.rpl.specter)
(select [:bucket-aggregation :buckets ALL (collect-one :key) ; TODO: extract that reoccuring path
         :bucket-aggregation :buckets ALL (collect-one :key) 
         :bucket-aggregation :buckets ALL (collect-one :key) 
         :sum :value] 
        data)
; => [["outer_bucket" "inner_bucket_1" 1510657200000 25]
; =>  ["outer_bucket" "inner_bucket_1" 1510660800000 50]
; =>  ["outer_bucket" "inner_bucket_2" 1510657200000 30]
; =>  ["outer_bucket" "inner_bucket_2" 1510660800000 35]
; =>  ["outer_bucket" "inner_bucket_3" 1510657200000 40]
; =>  ["outer_bucket" "inner_bucket_3" 1510660800000 45]]

从这里开始只是一些整形:

(map (fn [[k1 k2 k3 v]] {:keys [k1 k2 k3] :value v}) (select ...))
; => ({:keys ["outer_bucket" "inner_bucket_1" 1510657200000], :value 25}
; =>  {:keys ["outer_bucket" "inner_bucket_1" 1510660800000], :value 50}
; =>  {:keys ["outer_bucket" "inner_bucket_2" 1510657200000], :value 30}
; =>  {:keys ["outer_bucket" "inner_bucket_2" 1510660800000], :value 35}
; =>  {:keys ["outer_bucket" "inner_bucket_3" 1510657200000], :value 40}
; =>  {:keys ["outer_bucket" "inner_bucket_3" 1510660800000], :value 45})

这是使用 clojure.walk/postwalk 的另一种方法,它不假定固定的嵌套深度,即它可以处理更浅或更深的嵌套输入。

(clojure.walk/postwalk
  (fn [v]
    (cond
      ;; deepest case, pull up sum value
      (and (map? v) (:key v) (:sum v))
      {:key [(:key v)], :value (get-in v [:sum :value])}
      ;; pull up unnecessary buckets map wrapper
      (and (map? v) (:buckets v))
      (flatten (:buckets v))
      ;; select outer bucket + inner buckets
      (and (map? v) (:key v) (:bucket-aggregation v))
      (let [outer-key (:key v)
            buckets (:bucket-aggregation v)]
        (map #(update % :key (fn [k] (into [outer-key] k))) buckets))
      ;; pass-through
      :else v))
  (:bucket-aggregation result))
=>
({:key ["outer_bucket" "inner_bucket_1" 1510657200000], :value 25}
 {:key ["outer_bucket" "inner_bucket_1" 1510660800000], :value 50}
 {:key ["outer_bucket" "inner_bucket_2" 1510657200000], :value 30}
 {:key ["outer_bucket" "inner_bucket_2" 1510660800000], :value 35}
 {:key ["outer_bucket" "inner_bucket_3" 1510657200000], :value 40}
 {:key ["outer_bucket" "inner_bucket_3" 1510660800000], :value 45})