如何通过 logstash 过滤器从 csv 变异添加字典数组?

How to add array of dictionary via logstash filter mutate from csv?

我已经写了logstash配置文件来上传一个csv,csv有多个申请人信息,我需要在kibana索引中以字典数组的形式上传,而不是带索引的dict字典。

filter {
    csv {
        separator => ","
        skip_header => true
        columns => [LoanID,Applicant_Income1,Occupation1,Time_At_Work1,Date_Of_Join1,Gender,LoanAmount,Marital_Status,Dependents,Education,Self_Employed,Applicant_Income2,Occupation2,Time_At_Work2,Date_Of_Join2,Applicant_Income3,Occupation3,Time_At_Work3,Date_Of_Join3]
    }
    mutate { 
        convert => {
            "Applicant_Income1" => "float"
            "Time_At_Work1" => "float"
            "LoanAmount" => "float"
            "Applicant_Income2" => "float"
            "Time_At_Work2" => "float"
            "Applicant_Income3" => "float"
            "Time_At_Work3" => "float"
            }
        } 
    mutate{
        rename => {
            "Applicant_Income1" => "[Applicant][0][Applicant_Income]"
            "Occupation1" => "[Applicant][0][Occupation]"
            "Time_At_Work1" => "[Applicant][0][Time_At_Work]"
            "Date_Of_Join1" => "[Applicant][0][Date_Of_Join]"
            "Applicant_Income2" => "[Applicant][1][Applicant_Income]"
            "Occupation2" => "[Applicant][1][Occupation]"
            "Time_At_Work2" => "[Applicant][1][Time_At_Work]"
            "Date_Of_Join2" => "[Applicant][1][Date_Of_Join]"
            "Applicant_Income3" => "[Applicant][2][Applicant_Income]"
            "Occupation3" => "[Applicant][2][Occupation]"
            "Time_At_Work3" => "[Applicant][2][Time_At_Work]"
            "Date_Of_Join3" => "[Applicant][2][Date_Of_Join]"
            }
        }   
    date {
        match => [ "Date_Of_Join1", "yyyy-MM-dd'T'HH:mm:ss.SSZZ" ]
        }   
    date {
        match => [ "Date_Of_Join2", "yyyy-MM-dd'T'HH:mm:ss.SSZZ" ]
      } 
    date {
        match => [ "Date_Of_Join3", "yyyy-MM-dd'T'HH:mm:ss.SSZZ" ]      
      }       
    }

我得到的申请人字段是

但是我需要 Applicant 字段是一个字典数组,比如

我尝试了 add_field,但没有用

    mutate{
        add_field => {  "[Applicant][Applicant_Income1]" => "Applicant_Income1",
                    "[Applicant][Occupation1]" => "Occupation1",
                "[Applicant][Time_At_Work1]" => "Time_At_Work1",
                "[Applicant][Date_Of_Join1]" => "Date_Of_Join1"
                        }
        }

Logstash 过滤器中的方括号不像其他编程语言中的数组 elements/entries,例如Java.

[Applicant][0][Applicant_Income]

不是设置 Applicant-Array 中第一个元素(从零开始的索引)字段 Applicant_Income 值的正确语法。相反,您在 Applicant-element 下创建子元素 0、1、2,如图 1 所示。

要创建对象数组,您应该使用 ruby 过滤器插件 (https://www.elastic.co/guide/en/logstash/current/plugins-filters-ruby.html)。由于您可以使用该过滤器执行任意 ruby 代码,因此它为您提供更多 control/freedom:

filter {
  csv {
    separator => ","
    skip_header => true
    columns => [LoanID,Applicant_Income1,Occupation1,Time_At_Work1,Date_Of_Join1,Gender,LoanAmount,Marital_Status,Dependents,Education,Self_Employed,Applicant_Income2,Occupation2,Time_At_Work2,Date_Of_Join2,Applicant_Income3,Occupation3,Time_At_Work3,Date_Of_Join3]
  }

  mutate { 
    convert => {
      "Applicant_Income1" => "float"
      "Time_At_Work1" => "float"
      "LoanAmount" => "float"
      "Applicant_Income2" => "float"
      "Time_At_Work2" => "float"
      "Applicant_Income3" => "float"
      "Time_At_Work3" => "float"
    }
  } 

  ruby{
    code => '
      event.set("Applicant", 
       [
        {
         "Applicant_Income" => event.get("Applicant_Income1"),
         "Occupation" => event.get("Occupation1"), 
         "Time_At_Work" => event.get("Time_At_Work1"),
         "Date_Of_Join" => event.get("Date_Of_Join1")
        },
        {
           # next object...
        }
       ]
    '
  }

  date {
    match => [ "Date_Of_Join1", "yyyy-MM-dd'T'HH:mm:ss.SSZZ" ]
  } 

  date {
    match => [ "Date_Of_Join2", "yyyy-MM-dd'T'HH:mm:ss.SSZZ" ]
  } 

  date {
    match => [ "Date_Of_Join3", "yyyy-MM-dd'T'HH:mm:ss.SSZZ" ] 
  } 

  mutate{
    remove_field => [
      "Applicant_Income1",
      "Occupation1",
      "Time_At_Work1",
      "Date_Of_Join1",
      "Applicant_Income2",
      "Occupation2",
      "Time_At_Work2",
      "Date_Of_Join2",
      "Applicant_Income3",
      "Occupation3",
      "Time_At_Work3",
      "Date_Of_Join3"
    ]
  } 
}

使用 event.set 向文档添加一个字段。第一个参数是字段名,第二个是它的值。在这种情况下,您将字段 "Applicants" 添加到文档中,并将对象数组作为其值。

event.get用于获取文档中某个字段的值。您通过将字段名传递给方法来检索值。

请参考本指南 https://www.elastic.co/guide/en/logstash/current/event-api.html 以获得对事件的更多见解 API。

希望能帮到你