聚合多个递归 logstash

aggregate multiple recursive logstash

我正在将 logstash 与输入 jdbc 结合使用,并希望通过聚合将一个对象嵌入到另一个对象中。 如何使用递归添加?

即在另一个对象中添加一个对象?

这将是一个例子:

{
  "_index": "my-index",
  "_type": "test",
  "_id": "1",
  "_version": 1,
  "_score": 1,
  "_source": {
    "id": "1",
    "properties": {
      "nested_1": [
        {
          "A": 0,
          "B": "true",
          "C": "PEREZ, MATIAS  ROGELIO Y/O",
          "Nested_2": [
            {
              "Z1": "true",
              "Z2": "99999"
            }
        },
        {
          "A": 0,
          "B": "true",
          "C": "SALVADOR MATIAS ROMERO",
          "Nested_2": [
            {
              "Z1": "true",
              "Z2": "99999"
            }
        }
      ]
    }
  }
}

我正在使用类似的东西,但它不起作用

aggregate {
  task_id => "%{id}"
  code => "
      map['id'] = event.get('id')
      
      map['nested_1_list'] ||= []
      map['nested_1'] ||= []
      if (event.get('id') != nil)
        if !( map['nested_1_list'].include?event.get('id') ) 
          map['nested_1_list'] << event.get('id')
 
          map['nested_1'] << {
            'A' => event.get('a'),                             
            'B' => event.get('b'),
            'C' => event.get('c'),
            
             map['nested_2_list'] ||= []
              map['nested_2'] ||= []
              if (event.get('id_2') != nil)
                if !( map['nested_2_list'].include?event.get('id_2') ) 
                  map['nested_2_list'] << event.get('id_2')
         
                  map['nested_2'] << {
                    'Z1' => event.get('z1'), 
                    'Z2' => event.get('z2')
                  }
                end
              end
          }
        end
      end
       
      event.cancel()
  "
  push_previous_map_as_event => true
  timeout => 3

} 

知道如何实现吗?................................ ..........

最后我所做的是,从输入生成 JSON,即从 logstash 输入语句的视图 (vw) 使用的存储过程。

消费后,我将其处理为 json,并且我已经将 json 作为另一个变量使用。

# Convierto el string a json real (quita comillas y barras invertidas)
        ruby {
            code => "
                require 'json'
                json_value = JSON.parse(event.get('field_db').to_s)
                event.set('field_convert_to_json',json_value)
            "
        }

也许你可以试试这个。注意 这仅适用于您想要拥有单个对象而不是对象数组的情况。 请访问我的博客以获取其他格式。 https://xyzcoder.github.io/2020/07/29/indexing-documents-using-logstash-and-python.html

input {
    jdbc {
           jdbc_driver_library => "/usr/share/logstash/javalib/mssql-jdbc-8.2.2.jre11.jar"
           jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
           jdbc_connection_string => "jdbc:sqlserver://host.docker.internal;database=Whosebug2010;user=pavan;password=pavankumar@123"
           jdbc_user => "pavan"
           jdbc_password => "pavankumar@123"
           statement => "select top 500 p.Id as PostId,p.AcceptedAnswerId,p.AnswerCount,p.Body,u.Id as userid,u.DisplayName,u.Location
                        from Whosebug2010.dbo.Posts p inner join Whosebug2010.dbo.Users u
                        on p.OwnerUserId=u.Id"
        }
}

filter {
    aggregate {
        task_id => "%{postid}"
        code => "
            map['postid'] = event.get('postid')
            map['accepted_answer_id'] = event.get('acceptedanswerid')
            map['answer_count'] = event.get('answercount')
            map['body'] = event.get('body')
            map['user'] = {
                'id' => event.get('userid'),
                'displayname' => event.get('displayname'),
                'location' => event.get('location')
            }
            map['user']['test'] = {
                    'test_body' => event.get('postid')
                }
        event.cancel()"
        push_previous_map_as_event => true
        timeout => 30
    }
}

output {
    elasticsearch {
        hosts => ["http://elasticsearch:9200", "http://elasticsearch:9200"]
        index => "Whosebug_top"
    }
    stdout {
        codec => rubydebug
    }
}

我的输出是

{
        "_index" : "Whosebug_top",
        "_type" : "_doc",
        "_id" : "S8WEmnMBrXsRTNbKO0JJ",
        "_score" : 1.0,
        "_source" : {
          "@version" : "1",
          "body" : """<p>How do I store binary data in <a href="http://en.wikipedia.org/wiki/MySQL" rel="noreferrer">MySQL</a>?</p>
""",
          "@timestamp" : "2020-07-29T12:20:22.649Z",
          "answer_count" : 10,
          "user" : {
            "displayname" : "Geoff Dalgas",
            "location" : "Corvallis, OR",
            "test" : {
              "test_body" : 17
            },
            "id" : 2
          },
          "postid" : 17,
          "accepted_answer_id" : 26
        }

这里测试对象嵌套到用户对象中