Filebeat 5.0 输出到 Kafka 多个主题
Filebeat 5.0 output to Kafka multiple topics
我在我的应用程序服务器上安装了 Filebeat 5.0,并且有 3 个 Filebeat 探矿者,每个探矿者都指向不同的日志路径并输出到一个名为 myapp_applog
的 kafka 主题,一切正常。
我的 Filebeat 输出配置到一个主题 - Working
output.kafka:
# initial brokers for reading cluster metadata
hosts: ["broker.1.ip.address:9092", "broker.2.ip.address:9092", "broker.3.ip.address:9092"]
# message topic selection + partitioning
topic: 'myapp_applog'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
我想做的是根据条件将每个日志文件发送到单独的主题,请参阅 topics 上的文档部分。我试过这样做,但没有数据被发送到任何主题。有谁知道为什么我的条件不匹配或者是正确的。我似乎可以找到有关如何正确使用 "topics topic condition" 的示例。
这是我的 kafka 输出到多个主题配置。
不工作
output.kafka:
# initial brokers for reading cluster metadata
hosts: ["broker.1.ip.address:9092", "broker.2.ip.address:9092", "broker.3.ip.address:9092"]
# message topic selection + partitioning
topics:
- topic: 'myapp_applog'
when:
equals:
document_type: applog_myappapi
- topic: 'myapp_applog_stats'
when:
equals:
document_type: applog_myappapi_stats
- topic: 'myapp_elblog'
when:
equals:
document_type: elblog_myappapi
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
这里是完整的 filebeat.yml 配置文件。
################### Filebeat Configuration Example #########################
############################# Filebeat ######################################
filebeat.prospectors:
# App logs - prospector
- input_type: log
paths:
- /myapp/logs/myapp.log
exclude_lines: [".+? INFO[^*].+", ".+? DEBUG[^*].+"]
exclude_files: [".gz$", ".tmp"]
fields:
api: myappapi
environment: STG
ignore_older: 24h
document_type: applog_myappapi
scan_frequency: 1s
# Multine on Timestamp, YYYY-MM-DD
# https://www.elastic.co/guide/en/beats/filebeat/master/multiline-examples.html
multiline:
pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}'
negate: true
match: after
max_lines: 500
timeout: 5s
# Server Stats - prospector
- input_type: log
paths:
- /myapp/logs/serverstats.log
# Exclude messages with log level
exclude_lines: [".+? ERROR[^*].+", ".+? DEBUG[^*].+"]
exclude_files: [".gz$", ".tmp"]
fields:
api: myappapi
environment: STG
ignore_older: 24h
document_type: applog_myappapi_stats
scan_frequency: 1s
# ELB prospector
-
input_type: log
paths:
- /var/log/httpd/elasticbeanstalk-access_log
document_type: elblog_myappapi
fields:
api: myappapi
environment: STG
exclude_lines: [".+? INFO[^*].+", ".+? DEBUG[^*].+"]
exclude_files: [".gz$", ".tmp"]
ignore_older: 24h
# 0s, it is done as often as possible. Default: 10s
scan_frequency: 1s
registry_file: /var/lib/filebeat/registry
############################# Output ##########################################
# Configure what outputs to use when sending the data collected by the beat.
# Multiple outputs may be used.
#----------------------------- Kafka output --------------------------------
output.kafka:
# initial brokers for reading cluster metadata
hosts: ["broker.1.ip.address:9092", "broker.2.ip.address:9092", "broker.3.ip.address:9092"]
# message topic selection + partitioning
topics:
- topic: 'myapp_applog'
when:
equals:
document_type: applog_myappapi
- topic: 'myapp_applog_stats'
when:
equals:
document_type: applog_myappapi_stats
- topic: 'myapp_elblog'
when:
equals:
document_type: elblog_myappapi
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
############################# Logging #########################################
# There are three options for the log ouput: syslog, file, stderr.
# Under Windos systems, the log files are per default sent to the file output,
# under all other system per default to syslog.
logging:
# Send all logging output to syslog. On Windows default is false, otherwise
# default is true.
to_syslog: true
# Write all logging output to files. Beats automatically rotate files if rotateeverybytes
# limit is reached.
to_files: true
# To enable logging to files, to_files option has to be set to true
files:
# The directory where the log files will written to.
path: /var/log/
# The name of the files where the logs are written to.
name: filebeats.log
# Configure log file size limit. If limit is reached, log file will be
# automatically rotated
rotateeverybytes: 10485760 # = 10MB
# Number of rotated log files to keep. Oldest files will be deleted first.
keepfiles: 7
# Enable debug output for selected components. To enable all selectors use ["*"]
# Other available selectors are beat, publish, service
# Multiple selectors can be chained.
#selectors: ["*" ]
# Sets log level. The default log level is error.
# Available log levels are: critical, error, warning, info, debug
level: info
我遇到了同样的问题并通过定义输出来处理它:
topics:
- topic: '%{[type]}'
use_type: true
作为输入,您只需在 document_type 中设置:kaffka 的主题
input_type: 日志
路径:
- /path/to/log/file
document_type: "you'r kaffka's topic 1"
input_type: 日志
路径:
- /path/to/another/log/file
document_type: "you'r another kaffka's topic 1"
输入:
- type: log
fields:
kafka_topic: "my_topic_1"
- type: log
fields:
kafka_topic: "my_topic_2"
输出:
output.kafka:
hosts: ["mybroker:9092"]
topic: '%{[fields.kafka_topic]}'
以上示例显示了 2 个日志输入和 2 个 kafka 主题输出
我在我的应用程序服务器上安装了 Filebeat 5.0,并且有 3 个 Filebeat 探矿者,每个探矿者都指向不同的日志路径并输出到一个名为 myapp_applog
的 kafka 主题,一切正常。
我的 Filebeat 输出配置到一个主题 - Working
output.kafka:
# initial brokers for reading cluster metadata
hosts: ["broker.1.ip.address:9092", "broker.2.ip.address:9092", "broker.3.ip.address:9092"]
# message topic selection + partitioning
topic: 'myapp_applog'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
我想做的是根据条件将每个日志文件发送到单独的主题,请参阅 topics 上的文档部分。我试过这样做,但没有数据被发送到任何主题。有谁知道为什么我的条件不匹配或者是正确的。我似乎可以找到有关如何正确使用 "topics topic condition" 的示例。
这是我的 kafka 输出到多个主题配置。
不工作
output.kafka:
# initial brokers for reading cluster metadata
hosts: ["broker.1.ip.address:9092", "broker.2.ip.address:9092", "broker.3.ip.address:9092"]
# message topic selection + partitioning
topics:
- topic: 'myapp_applog'
when:
equals:
document_type: applog_myappapi
- topic: 'myapp_applog_stats'
when:
equals:
document_type: applog_myappapi_stats
- topic: 'myapp_elblog'
when:
equals:
document_type: elblog_myappapi
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
这里是完整的 filebeat.yml 配置文件。
################### Filebeat Configuration Example #########################
############################# Filebeat ######################################
filebeat.prospectors:
# App logs - prospector
- input_type: log
paths:
- /myapp/logs/myapp.log
exclude_lines: [".+? INFO[^*].+", ".+? DEBUG[^*].+"]
exclude_files: [".gz$", ".tmp"]
fields:
api: myappapi
environment: STG
ignore_older: 24h
document_type: applog_myappapi
scan_frequency: 1s
# Multine on Timestamp, YYYY-MM-DD
# https://www.elastic.co/guide/en/beats/filebeat/master/multiline-examples.html
multiline:
pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}'
negate: true
match: after
max_lines: 500
timeout: 5s
# Server Stats - prospector
- input_type: log
paths:
- /myapp/logs/serverstats.log
# Exclude messages with log level
exclude_lines: [".+? ERROR[^*].+", ".+? DEBUG[^*].+"]
exclude_files: [".gz$", ".tmp"]
fields:
api: myappapi
environment: STG
ignore_older: 24h
document_type: applog_myappapi_stats
scan_frequency: 1s
# ELB prospector
-
input_type: log
paths:
- /var/log/httpd/elasticbeanstalk-access_log
document_type: elblog_myappapi
fields:
api: myappapi
environment: STG
exclude_lines: [".+? INFO[^*].+", ".+? DEBUG[^*].+"]
exclude_files: [".gz$", ".tmp"]
ignore_older: 24h
# 0s, it is done as often as possible. Default: 10s
scan_frequency: 1s
registry_file: /var/lib/filebeat/registry
############################# Output ##########################################
# Configure what outputs to use when sending the data collected by the beat.
# Multiple outputs may be used.
#----------------------------- Kafka output --------------------------------
output.kafka:
# initial brokers for reading cluster metadata
hosts: ["broker.1.ip.address:9092", "broker.2.ip.address:9092", "broker.3.ip.address:9092"]
# message topic selection + partitioning
topics:
- topic: 'myapp_applog'
when:
equals:
document_type: applog_myappapi
- topic: 'myapp_applog_stats'
when:
equals:
document_type: applog_myappapi_stats
- topic: 'myapp_elblog'
when:
equals:
document_type: elblog_myappapi
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
############################# Logging #########################################
# There are three options for the log ouput: syslog, file, stderr.
# Under Windos systems, the log files are per default sent to the file output,
# under all other system per default to syslog.
logging:
# Send all logging output to syslog. On Windows default is false, otherwise
# default is true.
to_syslog: true
# Write all logging output to files. Beats automatically rotate files if rotateeverybytes
# limit is reached.
to_files: true
# To enable logging to files, to_files option has to be set to true
files:
# The directory where the log files will written to.
path: /var/log/
# The name of the files where the logs are written to.
name: filebeats.log
# Configure log file size limit. If limit is reached, log file will be
# automatically rotated
rotateeverybytes: 10485760 # = 10MB
# Number of rotated log files to keep. Oldest files will be deleted first.
keepfiles: 7
# Enable debug output for selected components. To enable all selectors use ["*"]
# Other available selectors are beat, publish, service
# Multiple selectors can be chained.
#selectors: ["*" ]
# Sets log level. The default log level is error.
# Available log levels are: critical, error, warning, info, debug
level: info
我遇到了同样的问题并通过定义输出来处理它:
topics:
- topic: '%{[type]}'
use_type: true
作为输入,您只需在 document_type 中设置:kaffka 的主题
input_type: 日志 路径:
- /path/to/log/file document_type: "you'r kaffka's topic 1"
input_type: 日志 路径:
- /path/to/another/log/file
document_type: "you'r another kaffka's topic 1"
输入:
- type: log
fields:
kafka_topic: "my_topic_1"
- type: log
fields:
kafka_topic: "my_topic_2"
输出:
output.kafka:
hosts: ["mybroker:9092"]
topic: '%{[fields.kafka_topic]}'
以上示例显示了 2 个日志输入和 2 个 kafka 主题输出