Nginx 使用 ELK Stack + Kafka 记录管道

Nginx Logs Pipleine using ELK Stack + Kafka

您好,我正在尝试通过 filebeat 将 json 格式的 nginx 日志发送到 kafka,然后发送到 logstash,然后是 ES,然后使用 Kibana 将其可视化。

我在json中的nginx日志格式如下:

 '{"@timestamp":"$time_iso8601","host":"$hostname",'
                            '"server_ip":"$server_addr","client_ip":"$remote_addr",'
                            '"xff":"$http_x_forwarded_for","domain":"$host",'
                            '"url":"$uri","referer":"$http_referer",'
                            '"args":"$args","upstreamtime":"$upstream_response_time",'
                            '"responsetime":"$request_time","request_method":"$request_method",'
                            '"status":"$status","size":"$body_bytes_sent",'
                            '"request_body":"$request_body","request_length":"$request_length",'
                            '"protocol":"$server_protocol","upstreamhost":"$upstream_addr",'
                            '"file_dir":"$request_filename","http_user_agent":"$http_user_agent"'
                            '}'

我的 LOgstash 配置文件

input {
  kafka {
    bootstrap_servers => "10.10.10.240:9092"
    topics => ["payments-nginx"]
    codec => json
  }
}
filter {
geoip {
      source => "client_ip"
      remove_field => [ "[geoip][location][lon]", "[geoip][location][lat]" ]
    }
useragent {
    source => "http_user_agent"
    target => "ua"
    remove_field => [ "[ua][minor]","[ua][major]","[ua][build]","[ua][os_name]" ]
  }
mutate {
        remove_field => [ "xff", "args", "request_body","http_user_agent" ]
      }
}
output {
  elasticsearch {
    hosts => ["localhost:9202"]
    manage_template => false
    index => "payments-nginx-%{+YYYY.MM.dd}"
  }
#}
#stdout { codec => rubydebug }
}

问题:我无法在 kibana 中看到状态、响应时间等字段,client_ip,它显示在消息中,但字段单独不可见,因为 logstash 无法解析 client_ip 字段,我的地理位置也失败了,logtash 日志没有显示错误但仍然无法解析,任何解决方案或原因

enter image description here

Filebeat 版本 7.10.1 logstash 版本 7.10.1 es 版本 7.10.0

我已为 logstash 启用调试日志

on filter {:event=>#<LogStash::Event:0x23a9bc8d>}
[2021-01-19T22:13:09,546][DEBUG][logstash.filters.json    ][main][4b26c54f59b453b33ce3542b7d139d33aaf31bfcf8185cddaf9983d6fdac068e] Running json filter {:event=>#<LogStash::Event:0x2143ba96>}
[2021-01-19T22:13:09,546][DEBUG][logstash.filters.json    ][main][4b26c54f59b453b33ce3542b7d139d33aaf31bfcf8185cddaf9983d6fdac068e] Event after json filter {:event=>#<LogStash::Event:0x2143ba96>}
[2021-01-19T22:13:09,547][DEBUG][logstash.filters.json    ][main][4b26c54f59b453b33ce3542b7d139d33aaf31bfcf8185cddaf9983d6fdac068e] Running json filter {:event=>#<LogStash::Event:0x6558b0f4>}
[2021-01-19T22:13:09,547][DEBUG][logstash.filters.json    ][main][4b26c54f59b453b33ce3542b7d139d33aaf31bfcf8185cddaf9983d6fdac068e] Event after json filter {:event=>#<LogStash::Event:0x6558b0f4>}
[2021-01-19T22:13:09,547][DEBUG][logstash.filters.json    ][main][4b26c54f59b453b33ce3542b7d139d33aaf31bfcf8185cddaf9983d6fdac068e] Running json filter {:event=>#<LogStash::Event:0x651bef38>}
[2021-01-19T22:13:09,547][DEBUG][logstash.filters.json    ][main][4b26c54f59b453b33ce3542b7d139d33aaf31bfcf8185cddaf9983d6fdac068e] Event after json filter {:event=>#<LogStash::Event:0x651bef38>}
[2021-01-19T22:13:09,873][DEBUG][org.apache.kafka.clients.FetchSessionHandler][main][24653f85c4bf579b2041409263ccfa549df5a7dcabc6d18b8484a1e85b9bfa97] [Consumer clientId=logstash-0, groupId=logstash] Node 0 sent an incremental fetch response for session 636759397 with 0 response partition(s), 2 implied partition(s)
[2021-01-19T22:13:09,873][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher][main][24653f85c4bf579b2041409263ccfa549df5a7dcabc6d18b8484a1e85b9bfa97] [Consumer clientId=logstash-0, groupId=logstash] Added READ_UNCOMMITTED fetch request for partition payments-nginx-1 at position FetchPosition{offset=26718738, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=mbkkafka240:9092 (id: 0 rack: null), epoch=-1}} to node mbkkafka240:9092 (id: 0 rack: null)
[2021-01-19T22:13:09,874][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher][main][24653f85c4bf579b2041409263ccfa549df5a7dcabc6d18b8484a1e85b9bfa97] [Consumer clientId=logstash-0, groupId=logstash] Added READ_UNCOMMITTED fetch request for partition payments-nginx-0 at position FetchPosition{offset=26715192, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=mbkkafka240:9092 (id: 0 rack: null), epoch=-1}} to node mbkkafka240:9092 (id: 0 rack: null)
[2021-01-19T22:13:09,874][DEBUG][org.apache.kafka.clients.FetchSessionHandler][main][24653f85c4bf579b2041409263ccfa549df5a7dcabc6d18b8484a1e85b9bfa97] [Consumer clientId=logstash-0, groupId=logstash] Built incremental fetch (sessionId=636759397, epoch=5580) for node 0. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 2 partition(s)
[2021-01-19T22:13:09,874][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher][main][24653f85c4bf579b2041409263ccfa549df5a7dcabc6d18b8484a1e85b9bfa97] [Consumer clientId=logstash-0, groupId=logstash] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(payments-nginx-1, payments-nginx-0)) to broker mbkkafka240:9092 (id: 0 rack: null)
[2021-01-19T22:13:09,875][DEBUG][org.apache.kafka.clients.NetworkClient][main][24653f85c4bf579b2041409263ccfa549df5a7dcabc6d18b8484a1e85b9bfa97] [Consumer clientId=logstash-0, groupId=logstash] Using older server API v10 to send FETCH {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,isolation_level=0,session_id=636759397,session_epoch=5580,topics=[],forgotten_topics_data=[]} with correlation id 5963 to node 0
Error parsing json {:source=>"message", :raw=>{"event"=>{"module"=>"nginx", "dataset"=>"nginx.access", "timezone"=>"+05:30"}, "log"=>{"offset"=>17947386523, "file"=>{"path"=>"/var/log/nginx/new_json.log"}}, "host"=>{"name"=>"mbkapp57", "os"=>{"version"=>"7 (Core)", "name"=>"CentOS Linux", "platform"=>"centos", "family"=>"redhat", "kernel"=>"3.10.0-327.13.1.el7.x86_64", "codename"=>"Core"}, "architecture"=>"x86_64", "containerized"=>false, "ip"=>["10.10.10.82"], "id"=>"92278feb4ab04110b9b72833eb1bf548", "mac"=>["22:77:85:f2:da:96", "d2:ea:9f:33:51:5c", "aa:d2:1d:af:62:49", "ba:c5:e6:af:17:ce"], "hostname"=>"mbkapp57"}, "message"=>"{\"@timestamp\":\"2021-01-20T00:05:31+05:30\",\"host\":\"mbkapp57\",\"server_ip\":\"10.10.10.82\",\"client_ip\":\"45.119.57.164\",\"xff\":\"-\",\"domain\":\"appapi.mobikwik.com\",\"url\":\"/p/upgradewallet/v3/kycConsent\",\"referer\":\"-\",\"args\":\"-\",\"upstreamtime\":\"0.026\",\"responsetime\":\"0.026\",\"request_method\":\"GET\",\"status\":\"200\",\"size\":\"129\",\"request_body\":\"-\",\"request_length\":\"425\",\"protocol\":\"HTTP/1.1\",\"upstreamhost\":\"10.10.10.159:8080\",\"file_dir\":\"/usr/share/nginx/html/p/upgradewallet/v3/kycConsent\",\"http_user_agent\":\"Mobikwik/22 CFNetwork/1209 Darwin/20.2.0\"}", "@timestamp"=>"2021-01-19T18:35:31.410Z", "agent"=>{"version"=>"7.10.1", "ephemeral_id"=>"2d565807-b19c-40c2-a092-e66ee5dd4f9b", "name"=>"mbkapp57", "type"=>"filebeat", "hostname"=>"mbkapp57", "id"=>"c9d8d92d-31be-4996-99d8-aae6680c1570"}, "fileset"=>{"name"=>"access"}, "ecs"=>{"version"=>"1.5.0"}, "@metadata"=>{"beat"=>"filebeat", "version"=>"7.10.1", "pipeline"=>"filebeat-7.10.1-nginx-access-pipeline", "type"=>"_doc"}, "input"=>{"type"=>"log"}, "service"=>{"type"=>"nginx"}}, :exception=>java.lang.ClassCastException}
[root@es02 logstash]# 

我通过将 Nginx 日志模式更改为以下格式来稍微改变了我的方法:

log_format custom  '$remote_addr [$time_local] '
                           '$request $status $body_bytes_sent '
                           '$http_referer $http_user_agent $host $upstream_addr $request_time';

此外,logstash 配置从 Kafka 解析 nginx 日志,并通过应用 grok 模式将其转换为 json 格式 Logstash 配置文件:

  kafka {
    bootstrap_servers => "10.X.X.X:9092"
    topics => ["payments"]
  }
}
filter {
   grok {
      match => { "message" => "%{IPORHOST:remote_ip} \[%{HTTPDATE:time}\] %{WORD:http_method} %{DATA:url} HTTP/%{NUMBER:http_version} %{NUMBER:response_code} %{NUMBER:body_sent_bytes} %{DATA:referrer} %{DATA:agent} %{IPORHOST:domain} %{NOTSPACE:upstream_address} %{NUMBER:upstream_time}" }
        }
   date {
        match => [ "time", "dd/MMM/YYYY:H:m:s Z" ]
        target => "my_date_as_date"
      }
   geoip {
      source => "remote_ip"
      add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
      add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
    }
   mutate {
      add_field => { "host_nginx" => "X.X.X.82" }
      add_field => { "read_time" => "%{@timestamp}" }
      convert => ["body_sent_bytes", "integer"]
      convert => ["upstream_time", "float"]
      remove_field => [ "message", "[geoip][location][lat]", "[geoip][location][lon]", "[geoip][country_code3]" ]
}
}
output {
   elasticsearch {
    hosts => ["localhost:9200"]
    manage_template => false
    index => "paymentapi-nginx-%{+YYYY.MM.dd}"
  }
}