使用查询结果作为 Elasticsearch DSL 中另一个查询的参数

Use query result as parameter for another query in Elasticsearch DSL

我正在使用 Elasticsearch DSL,我正在尝试将查询结果用作另一个查询的参数,如下所示:

{
  "query": {
    "bool": {
      "must_not": {
        "terms": {
          "request_id": {
            "query": {
              "match": {
                "processing.message": "OUT Followup Synthesis"
              }
            },
            "fields": [
              "request_id"
            ],
            "_source": false
          }
        }
      }
    }
  }
}

正如您在上面看到的,我正在尝试搜索他们的 request_id 不是 request_ids 之一且 processing.message 等于 OUT Followup Synthesis 的来源。

我在查询时遇到错误:

Error loading data [x_content_parse_exception] [1:1660] [terms_lookup] unknown field [query]

如何使用 Elasticsearch DSL 实现我的目标?

从评论中提取的原始问题

I'm trying to fetch data with processing.message equals to 'IN Followup Sythesis' with their request_id doesn't appear in data with processing.message equals to 'OUT Followup Sythesis'. In SQL language:

SELECT d FROM   data d
WHERE  d.processing.message = 'IN Followup Sythesis'
       AND d.request_id NOT IN (SELECT request_id FROM data WHERE processing.message = 'OUT Followup Sythesis'); 

回答:一般来说,Elasticsearchapplication-side joins nor 都没有。

因此您必须 运行 您的第一个查询,获取检索到的 ID 并将它们放入第二个查询中 — 最好是 terms query.


当然,这个限制可以通过“劫持”一个scripted metric aggregation来克服。

以这3个文件为例:

POST reqs/_doc
{"request_id":"abc","processing":{"message":"OUT Followup Synthesis"}}

POST reqs/_doc
{"request_id":"abc","processing":{"message":"IN Followup Sythesis"}}

POST reqs/_doc
{"request_id":"xyz","processing":{"message":"IN Followup Sythesis"}}

你可以运行

POST reqs/_search
{
  "size": 0,
  "query": {
    "match": {
      "processing.message": "IN Followup Sythesis"
    }
  },
  "aggs": {
    "subquery_mock": {
      "scripted_metric": {
        "params": {
          "disallowed_msg": "OUT Followup Synthesis"
        }, 
        "init_script": "state.by_request_ids = [:]; state.disallowed_request_ids = [];",
        "map_script": """
          def req_id = params._source.request_id;
          def msg = params._source.processing.message;
          
          if (msg.contains(params.disallowed_msg)) {
            state.disallowed_request_ids.add(req_id);
            // won't need this particular doc so continue looping
            return;
          }
          
          if (state.by_request_ids.containsKey(req_id)) {
            // there may be multiple docs under the same ID
            // so concatenate them
            state.by_request_ids[req_id].add(params._source);
          } else {
            // initialize an appendable arraylist
            state.by_request_ids[req_id] = [params._source];
          }
        """,
        "combine_script": """
          state.by_request_ids.entrySet()
            .removeIf(entry -> state.disallowed_request_ids.contains(entry.getKey()));
          return state.by_request_ids
        """,
        "reduce_script": "return states"
      }
    }
  }
}

哪个 return 只有正确的请求:

"aggregations" : {
  "subquery_mock" : {
    "value" : [
      {
        "xyz" : [
          {
            "processing" : { "message" : "IN Followup Sythesis" },
            "request_id" : "xyz"
          }
        ]
      }
    ]
  }
}

⚠️ 这几乎肯定会很慢,并且违背了 not accessing the _source field 的建议指导。但它也表明可以“模拟”子查询。

我建议先在较小的文档集上测试此脚本,然后再让它以整个索引为目标 — 也许可以通过日期 range query 或类似日期来限制它。


仅供参考 Elasticsearch 公开了一个 SQL API,尽管它仅通过付费产品 X-Pack 提供。