Fluentd 可以将日志发送到 Logstash 吗?

Can Fluentd send logs to Logstash?

我一整天都在尝试这样做。我想通过 fluentd 日志引擎将日志从 Docker 发送到 FluentD,然后从 fluentd 将这些日志发送到 logstash 进行处理。

虽然我一直从 logstash 收到此错误:

{:timestamp=>"2016-03-09T23:29:19.388000+0000",
 :message=>"An error occurred. Closing connection",
 :client=>"172.18.0.1:57259", :exception=>#<TypeError: can't convert String into Integer>,
 :backtrace=>["org/jruby/RubyTime.java:1073:in `at'", 
"/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-event-2.2.2-java/lib/logstash/timestamp.rb:27:in `at'", 
"/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-codec-fluent-2.0.2-java/lib/logstash/codecs/fluent.rb:41:in `decode'", 
"org/msgpack/jruby/MessagePackLibrary.java:195:in `each'", 
"/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-codec-fluent-2.0.2-java/lib/logstash/codecs/fluent.rb:40:in `decode'", 
"/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-tcp-3.0.2/lib/logstash/inputs/tcp.rb:153:in `handle_socket'", 
"/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-tcp-3.0.2/lib/logstash/inputs/tcp.rb:143:in `server_connection_thread'"], :level=>:error}

相当基本的 logstash 配置:

input {
  tcp {
    port => 4000
    codec => "fluent"
  }
}

output {
  stdout {
  }
}

相当基本的 fluentd 配置:

<source>
  @type forward
</source>


<match docker.json>
  @type forward
  send_timeout 60s 
  recover_wait 10s 
  heartbeat_type none
  phi_threshold 16
  hard_timeout 60s 

  <server>
    name logstash
    host 172.18.0.2
    port 4000
    weight 60
  </server>
</match>

<match docker.**>
  @type stdout
</match>

有人会认为这行得通,但我已经发现 Logstash 行不通:

  1. 使用 fluentd 的 forward_out 心跳配置。
    • Logstash 不在与 TCP 相同的端口上打开 UDP 端口。
  2. 以上错误。

如果我在 Ruby 中制作 Fluentd 消息包消息并将它们发送给 manually.The 密钥,则上述配置确实有效,但我希望 Fluentd 在本地管理日志并将它们发送到外部 logstash 服务器将消息正确处理为 JSON.

据我所知,无法将数据从 Fluentd 传输到 Logstash。我们需要编写任何 Fluentd 输出插件以将数据发送到 Logstash,或编写任何 Logstash 输入插件以从 Fluentd 接收数据。

仅供参考:有一些用于 Logstash -> Fluentd 方向的插件:

  • fluent-plugin-beats(Elastic beats 协议的 fluentd 输入插件)
  • logstash-output-fluentd(将数据发送到 Fluentd 的 logstash 输出插件)

您可以直接将其转发到 logstash tcp 输入。

open-source flunetd output plugin 将以 json 格式(也支持 ssl/tls)将数据直接发送到 logstash tcp 输入(或任何其他接收器)。

第一次看到这里

我们找到了使 fluent -> logstash 工作的方法。设置 time_as_integer true。 fluentd 方面的最小配置是

<source>
  @type http
  @id input_http
  port 8888
</source>

<match **>
  @type forward
  time_as_integer true
  <server>
    host localhost
    port 24114
  </server>
</match>

它在 https://docs.fluentd.org/v0.12/articles/in_forward#i-got-messagepackunknownexttypeerror-error-why 中提到得很隐蔽。 在 logstash 端,使用最新版本(6.2.4),然后简单地配置流畅的编解码器,tcp 输入如下:

input {
  tcp {
    codec => fluent
    port => 24114
  }
}

filter {
}

output {
  stdout { codec => rubydebug }
}

测试

curl -X POST -d 'json={"json":"message"}' http://localhost:8888/debug.test

如文档中所述。使用 time_as_integer 设置,logstash 输出看起来不错,就像。

{
          "port" => 32844,
      "@version" => "1",
          "host" => "localhost",
          "json" => "message",
    "@timestamp" => 2018-04-26T15:14:28.000Z,
          "tags" => [
        [0] "debug.test"
    ]
}

没有它,我得到

[2018-04-26T15:16:00,115][ERROR][logstash.codecs.fluent   ] Fluent parse error, original data now in message field {:error=>#<MessagePack::UnknownExtTypeError: unexpected extension type>, :data=>["fluent.info", "\x92\xD7\u0000Z\xE1\xEC\xF4\u0006$\x96傦worker\u0000\xA7message\xD9&fluentd worker is now running worker=0", {"size"=>1, "compressed"=>"text"}]}
{
          "port" => 32972,
      "@version" => "1",
       "message" => [
        [0] "fluent.info",
        [1] "\x92\xD7\u0000Z\xE1\xEC\xF4\u0006$\x96傦worker\u0000\xA7message\xD9&fluentd worker is now running worker=0",
        [2] {
                  "size" => 1,
            "compressed" => "text"
        }
    ],
          "host" => "localhost",
    "@timestamp" => 2018-04-26T15:16:00.116Z,
          "tags" => [
        [0] "_fluentparsefailure"
    ]
}