Elasticsearch 聚合:成交量加权平均价格

Elasticsearch Aggregations: Volume-Weighted Average Price

我需要绘制在特定时间范围内具有 price_per_unitquantitiy 的交易的 Volume-Weighted Average Prive (VWAP)

作为聚合的结果,date_histogram 的每个桶都应该包含到目前为止发生的所有交易的 VWAP。

我不太确定这是否可以使用 Elasticsearch,也不确定什么是正确的方法(比如使用脚本?)?

trade 文档的基本映射非常简单:

"trade": {
  "properties": 
    "trade_id": {"type": "string", "index": "not_analyzed"},
    "product_id": {"type": "string", "index": "not_analyzed"},
    "quantity": {'type': 'double'}, // number of units
    "execution_time": {'type': 'date'},
    "price_per_unit": {'type': 'double'},
  }
}

execution_time 应该用于 date_histogram,交易总价是 price_per_unitquantity 的乘积。因此 VWAP = sum(price_per_unit * quantity) / sum(quantity).

DELETE test
PUT test
{
  "mappings": {
    "trade": {
      "properties": {
        "trade_id": {
          "type": "string",
          "index": "not_analyzed"
        },
        "product_id": {
          "type": "string",
          "index": "not_analyzed"
        },
        "quantity": {
          "type": "double"
        },
        "execution_time": {
          "type": "date"
        },
        "price_per_unit": {
          "type": "double"
        }
      }
    }
  }
}

POST test/trade/_bulk
{"index":{}}
{"execution_time":"2016-11-18T22:45:27Z","quantity":10,"price_per_unit":5}
{"index":{}}
{"execution_time":"2016-11-18T22:45:27Z","quantity":10,"price_per_unit":5}
{"index":{}}
{"execution_time":"2016-11-19T22:45:27Z","quantity":10,"price_per_unit":5}
{"index":{}}
{"execution_time":"2016-11-20T22:45:27Z","quantity":10,"price_per_unit":5}
{"index":{}}
{"execution_time":"2016-11-20T22:45:27Z","quantity":10,"price_per_unit":5}
{"index":{}}
{"execution_time":"2016-11-20T22:45:27Z","quantity":10,"price_per_unit":5}
{"index":{}}
{"execution_time":"2016-11-21T22:45:27Z","quantity":10,"price_per_unit":5}
{"index":{}}
{"execution_time":"2016-11-21T22:45:27Z","quantity":10,"price_per_unit":5}

POST test/trade/_search
{
  "size": 0,
  "aggs": {
    "sales_per_day": {
      "date_histogram": {
        "field": "execution_time",
        "interval": "day"
      },
      "aggs": {
        "sales": {
          "sum": {
            "script": {
              "lang": "groovy",
              "inline": "doc['quantity'] * doc['price_per_unit']"
            }
          }
        },
        "cumulative_sales": {
          "cumulative_sum": {
            "buckets_path": "sales"
          }
        }
      }
    }
  }
}

并且您需要启用 inline scripting for groovy