QuickSight 或 Elasticsearch - 列式聚合

QuickSight or Elasticsearch - Column wise aggregration

这可以在 QuickSight 或 Elasticsearch 中完成吗?我已经尝试过 QuickSight 中的计算字段和 Elasticsearch 中的运行时脚本,不知道该怎么做?另外,这个工具甚至可能实现我所期望的。

根据操作在列之间尝试一个简单的日期差异,这里...“用户注册后 'creating a post' 所用的时间”

数据输入:

数据输出

可以使用scripted metric aggregation

数据

"hits" : [
      {
        "_index" : "index121",
        "_type" : "_doc",
        "_id" : "aqJ3HnoBF6_U07qsNY-s",
        "_score" : 1.0,
        "_source" : {
          "user" : "Jen",
          "activity" : "Logged In",
          "activity_Time" : "2020-01-08"
        }
      },
      {
        "_index" : "index121",
        "_type" : "_doc",
        "_id" : "a6J3HnoBF6_U07qsXY_8",
        "_score" : 1.0,
        "_source" : {
          "user" : "Jen",
          "activity" : "Created a post",
          "activity_Time" : "2020-05-08"
        }
      },
      {
        "_index" : "index121",
        "_type" : "_doc",
        "_id" : "bKJ3HnoBF6_U07qsk4-0",
        "_score" : 1.0,
        "_source" : {
          "user" : "Mark",
          "activity" : "Logged In",
          "activity_Time" : "2020-01-03"
        }
      },
      {
        "_index" : "index121",
        "_type" : "_doc",
        "_id" : "baJ3HnoBF6_U07qsu48g",
        "_score" : 1.0,
        "_source" : {
          "user" : "Mark",
          "activity" : "Created a post",
          "activity_Time" : "2020-01-08"
        }
      }
    ]

查询

{
  "size": 0,
  "aggs": {
    "user": {
      "terms": {
        "field": "user.keyword",
        "size": 10000
      },
      "aggs": {
        "distinct_sum_feedback": {
          "scripted_metric": {
            "init_script": "state.docs = []",
            "map_script": """ Map span = [
                                           'timestamp':doc['activity_Time'],
                                           'activity':doc['activity.keyword'].value
                                         ];
                              state.docs.add(span)
                          """,
            "combine_script": "return state.docs;",
            "reduce_script": """
                                  def all_docs = [];
                                  for (s in states) 
                                  {
                                      for (span in s) {
                                      all_docs.add(span);
                                    }
                                  }
                                  all_docs.sort((HashMap o1, HashMap o2)->o1['timestamp'].getValue().toInstant().toEpochMilli().compareTo(o2['timestamp'].getValue().toInstant().toEpochMilli()));
                                  
                                  Hashtable result= new Hashtable();
                                  boolean found = false;
                                  JodaCompatibleZonedDateTime loggedIn;
                                  for (s in all_docs) 
                                  {
                                      if(s.activity =='Logged In')
                                      {
                                        loggedIn=s.timestamp.getValue();
                                        found= true;
                                      }
                                      
                                      if(s.activity =='Created a post' && found==true)
                                      {
                                         found=false;
                                         
                                         def dt=loggedIn.getYear()+ '-' + loggedIn.getMonth() + '-' + loggedIn.getDayOfMonth();
                                         
                                         def diff= s.timestamp.getValue().toInstant().toEpochMilli() - loggedIn.toInstant().toEpochMilli();
                                         
                                         if(result.get(dt) == null)
                                         {
                                             result.put(dt, diff / 1000 / 60 / 60 / 24 )
                                         }
                                      }
                                  }
                                      
                                  return result;
                           """
          }
        }
      }
    }
  }
}

结果

"user" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "Jen",
          "doc_count" : 2,
          "distinct_sum_feedback" : {
            "value" : {
              "2020-JANUARY-8" : 121
            }
          }
        },
        {
          "key" : "Mark",
          "doc_count" : 2,
          "distinct_sum_feedback" : {
            "value" : {
              "2020-JANUARY-3" : 5
            }
          }
        }
      ]
    }

说明

  1. "init_script":

Executed prior to any collection of documents. Allows the aggregation to set up any initial state.

已声明地图

  1. "map_script"

Executed once per document collected Loop through all document and add activity and timestamp to map

  1. combine_script

Executed once on each shard after document collection is complete

Return 所有分片的 Map 集合

  1. reduce_script

Executed once on the coordinating node after all shards have returned their results

再次遍历所有 Map 并创建单个集合并按时间戳排序。然后通过排序的地图并插入登录和下一个“创建 post”时间(登录时间和 post 创建时间的差异)