如何使用 logstash 从 csv 输入数据到 elasticsearch?

How to enter data from csv to elasticsearch using logstash?

我正在尝试使用 logstash 将数据插入到 elasticsearch 中,但卡住了。我的配置文件:

logstashCrime.conf
input {
    file {
        path => "C:\elk\sampl.csv"
        start_position => "beginning"
        sincedb_path => "nul"
    }
}

filter {
    csv {
        separator => ","

        columns => ["code","name"]
    }
}

output {
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "crime"
    }
    stdout {
        codec => rubydebug
        }
}

当我尝试使用 logstash-7.2.0\bin\logstash -f c:\elk\logstashCrime.conf:

插入时,我得到了这样的响应
Thread.exclusive is deprecated, use Thread::Mutex
Sending Logstash logs to C:/elk/logstash-7.2.0/logs which is now configured via log4j2.properties
[2019-07-15T16:10:22,300][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2019-07-15T16:10:22,320][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"7.2.0"}
[2019-07-15T16:10:28,817][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}}
[2019-07-15T16:10:29,009][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://localhost:9200/"}
[2019-07-15T16:10:29,058][INFO ][logstash.outputs.elasticsearch] ES Output version determined {:es_version=>7}
[2019-07-15T16:10:29,063][WARN ][logstash.outputs.elasticsearch] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>7}
[2019-07-15T16:10:29,087][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//localhost:9200"]}
[2019-07-15T16:10:29,148][INFO ][logstash.outputs.elasticsearch] Using default mapping template
[2019-07-15T16:10:29,202][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"index_patterns"=>"logstash-*", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s", "number_of_shards"=>1}, "mappings"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}
[2019-07-15T16:10:29,225][WARN ][org.logstash.instrument.metrics.gauge.LazyDelegatingGauge] A gauge metric of an unknown type (org.jruby.specialized.RubyArrayOneObject) has been create for key: cluster_uuids. This may result in invalid serialization.  It is recommended to log an issue to the responsible developer/development team.
[2019-07-15T16:10:29,229][INFO ][logstash.javapipeline    ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>500, :thread=>"#<Thread:0x74421f35 run>"}
[2019-07-15T16:10:30,202][INFO ][logstash.javapipeline    ] Pipeline started {"pipeline.id"=>"main"}
[2019-07-15T16:10:30,408][INFO ][filewatch.observingtail  ] START, creating Discoverer, Watch with file and sincedb collections
[2019-07-15T16:10:30,416][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2019-07-15T16:10:30,755][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

我的 sampl.csv 文件如下所示:

id,name
------
1,john
2,doe
3,you
4,me

我是 ELK 的新手。任何帮助表示赞赏。另外,我使用 windows 10 作为我的 OS。我使用带有 csv 的 logstash 成功创建了一个索引,但是使用 csv 它没有创建。

我想在Kibana中查看,但是由于没有创建索引,所以在Kibana中看不到索引。

您不能在文件输入的路径选项中使用反斜杠。使用正斜杠。