Elasticsearch 预处理以删除空字段作为摄取的一部分
Elasticsearch pre-processing to remove null fields as part of ingest
我有一个用例,其中 API 我调用以检索要放入 elasticsearch 的数据返回空值。
我需要编写一个使用处理器删除所有空字段的摄取管道,然后再将其写入 elasticsearch。处理器可能会也可能不会使用无痛脚本。
这是我目前从 API
获得的示例负载
{
"master_desc": "TESTING PART",
"date_added": "2019-10-24T09:30:03",
"master_no": {
"master_no": 18460110,
"barcode": "NLSKYTEST1-1",
"external_key": null,
"umid": null
}
}
理想情况下,管道应将文档插入为 -
{
"master_desc": "TESTING PART",
"date_added": "2019-10-24T09:30:03",
"master_no": {
"master_no": 18460110,
"barcode": "NLSKYTEST1-1"
}
}
请注意,这些字段是动态的,所以我无法编写一个处理器来根据一组定义的字段检查空值。
谢谢!
空字段未编入索引,searchable.I 也未在管道下方写入以删除此类字段。在您的所有场景中使用之前,请对其进行测试。使用此管道发布文档后,您将无法使用 "exists"
搜索空字段
管道:
PUT _ingest/pipeline/remove_null_fields
{
"description": "Remove any null field",
"processors": [
{
"script": {
"source": """
// return list of field with null values
def loopAllFields(def x){
def ret=[];
if(x instanceof Map){
for (entry in x.entrySet()) {
if (entry.getKey().indexOf("_")==0) {
continue;
}
def val=entry.getValue();
if( val instanceof HashMap ||
val instanceof Map ||
val instanceof ArrayList)
{
def list=[];
if(val instanceof ArrayList)
{
def index=0;
// Call for each object in arraylist
for(v in val)
{
list=loopAllFields(v);
for(item in list)
{
ret.add(entry.getKey()+"["+index+"]."+ item);
}
index++;
}
}
else
{
list =loopAllFields(val);
}
if(list.size()==val.size())
{
ret.add(entry.getKey());
}
else{
for(item in list)
{
ret.add(entry.getKey()+"."+ item);
}
}
}
if(val==null)
{
ret.add(entry.getKey());
}
}
}
return ret;
}
/* remove fields from source, recursively deletes fields which part of other fields */
def removeField(def ctx, def fieldname)
{
def pos=fieldname.indexOf(".");
if(pos>0)
{
def str=fieldname.substring(0,pos);
if(str.indexOf('[')>0 && str.indexOf(']')>0)
{
def s=str.substring(0,str.indexOf('['));
def i=str.substring(str.indexOf('[')+1,str.length()-1);
removeField(ctx[s][Integer.parseInt(i)],fieldname.substring(pos+1,fieldname.length()));
}
else
{
if(ctx[str] instanceof Map)
{
removeField(ctx[str],fieldname.substring(pos+1,fieldname.length()));
}
}
}else{
ctx.remove(fieldname);
}
return ctx;
}
def list=[];
list=loopAllFields(ctx);
for(item in list)
{
removeField(ctx,item);
}
"""
}
}
]
}
Post 文件:
POST index8/_doc?pipeline=remove_null_fields
{
"master_desc": "TESTING PART",
"ddd":null,
"date_added": "2019-10-24T09:30:03",
"master_no": {
"master_no": 18460110,
"barcode": "NLSKYTEST1-1",
"external_key": null,
"umid": null
}
}
结果:
"hits" : [
{
"_index" : "index8",
"_type" : "_doc",
"_id" : "06XAyXEBAWHHnYGOSa_M",
"_score" : 1.0,
"_source" : {
"date_added" : "2019-10-24T09:30:03",
"master_no" : {
"master_no" : 18460110,
"barcode" : "NLSKYTEST1-1"
},
"master_desc" : "TESTING PART"
}
}
]
@Jaspreet,所以脚本几乎成功了。然而,它并没有消除空对象、空数组或空值。这是我试图索引的文档 -
{
"master_desc": "TESTING PART",
"date_added": "2019-10-24T09:30:03",
"master_no": {
"master_no": 18460110,
"barcode": "NLSKYTEST1-1",
"external_key": null,
"umid": null
},
"remote_sync_state": "",
"lib_title_footage": [],
"prj_no": {
"prj_no": null,
"prj_desc": null,
}
以上返回-
{
"master_desc": "TESTING PART",
"date_added": "2019-10-24T09:30:03",
"master_no": {
"master_no": 18460110,
"barcode": "NLSKYTEST1-1"
},
"remote_sync_state": "",
"lib_title_footage": [ ],
"prj_no": { }
我尝试更新脚本以对这些模式进行条件检查,但不幸的是遇到了编译错误。
我有一个用例,其中 API 我调用以检索要放入 elasticsearch 的数据返回空值。
我需要编写一个使用处理器删除所有空字段的摄取管道,然后再将其写入 elasticsearch。处理器可能会也可能不会使用无痛脚本。
这是我目前从 API
获得的示例负载{
"master_desc": "TESTING PART",
"date_added": "2019-10-24T09:30:03",
"master_no": {
"master_no": 18460110,
"barcode": "NLSKYTEST1-1",
"external_key": null,
"umid": null
}
}
理想情况下,管道应将文档插入为 -
{
"master_desc": "TESTING PART",
"date_added": "2019-10-24T09:30:03",
"master_no": {
"master_no": 18460110,
"barcode": "NLSKYTEST1-1"
}
}
请注意,这些字段是动态的,所以我无法编写一个处理器来根据一组定义的字段检查空值。
谢谢!
空字段未编入索引,searchable.I 也未在管道下方写入以删除此类字段。在您的所有场景中使用之前,请对其进行测试。使用此管道发布文档后,您将无法使用 "exists"
搜索空字段管道:
PUT _ingest/pipeline/remove_null_fields
{
"description": "Remove any null field",
"processors": [
{
"script": {
"source": """
// return list of field with null values
def loopAllFields(def x){
def ret=[];
if(x instanceof Map){
for (entry in x.entrySet()) {
if (entry.getKey().indexOf("_")==0) {
continue;
}
def val=entry.getValue();
if( val instanceof HashMap ||
val instanceof Map ||
val instanceof ArrayList)
{
def list=[];
if(val instanceof ArrayList)
{
def index=0;
// Call for each object in arraylist
for(v in val)
{
list=loopAllFields(v);
for(item in list)
{
ret.add(entry.getKey()+"["+index+"]."+ item);
}
index++;
}
}
else
{
list =loopAllFields(val);
}
if(list.size()==val.size())
{
ret.add(entry.getKey());
}
else{
for(item in list)
{
ret.add(entry.getKey()+"."+ item);
}
}
}
if(val==null)
{
ret.add(entry.getKey());
}
}
}
return ret;
}
/* remove fields from source, recursively deletes fields which part of other fields */
def removeField(def ctx, def fieldname)
{
def pos=fieldname.indexOf(".");
if(pos>0)
{
def str=fieldname.substring(0,pos);
if(str.indexOf('[')>0 && str.indexOf(']')>0)
{
def s=str.substring(0,str.indexOf('['));
def i=str.substring(str.indexOf('[')+1,str.length()-1);
removeField(ctx[s][Integer.parseInt(i)],fieldname.substring(pos+1,fieldname.length()));
}
else
{
if(ctx[str] instanceof Map)
{
removeField(ctx[str],fieldname.substring(pos+1,fieldname.length()));
}
}
}else{
ctx.remove(fieldname);
}
return ctx;
}
def list=[];
list=loopAllFields(ctx);
for(item in list)
{
removeField(ctx,item);
}
"""
}
}
]
}
Post 文件:
POST index8/_doc?pipeline=remove_null_fields
{
"master_desc": "TESTING PART",
"ddd":null,
"date_added": "2019-10-24T09:30:03",
"master_no": {
"master_no": 18460110,
"barcode": "NLSKYTEST1-1",
"external_key": null,
"umid": null
}
}
结果:
"hits" : [
{
"_index" : "index8",
"_type" : "_doc",
"_id" : "06XAyXEBAWHHnYGOSa_M",
"_score" : 1.0,
"_source" : {
"date_added" : "2019-10-24T09:30:03",
"master_no" : {
"master_no" : 18460110,
"barcode" : "NLSKYTEST1-1"
},
"master_desc" : "TESTING PART"
}
}
]
@Jaspreet,所以脚本几乎成功了。然而,它并没有消除空对象、空数组或空值。这是我试图索引的文档 -
{
"master_desc": "TESTING PART",
"date_added": "2019-10-24T09:30:03",
"master_no": {
"master_no": 18460110,
"barcode": "NLSKYTEST1-1",
"external_key": null,
"umid": null
},
"remote_sync_state": "",
"lib_title_footage": [],
"prj_no": {
"prj_no": null,
"prj_desc": null,
}
以上返回-
{
"master_desc": "TESTING PART",
"date_added": "2019-10-24T09:30:03",
"master_no": {
"master_no": 18460110,
"barcode": "NLSKYTEST1-1"
},
"remote_sync_state": "",
"lib_title_footage": [ ],
"prj_no": { }
我尝试更新脚本以对这些模式进行条件检查,但不幸的是遇到了编译错误。