Fluentd 过滤器以排除具有空值的键
Fluentd filter to exclude key with empty value
我想排除 serive_name 为空的行 "service_name":""
。
这是我的流利的 conf
## match tag=debug.** and dump to console
<match debug.**>
@type stdout
</match>
<source>
@type tail
path /opt/wso2esb-4.9.0-wkr-1/repository/logs/wso2carbon.log
pos_file /var/log/td-agent/tmp/wso2carbon.log.pos
tag debug.wso2.esb
format /^([TID:]* [^ ]* [^ ]* \[(?<time>[^\]]*)\]) ([^ ]* (?<level>[^ ]*))([^***]*[^=]*[^ ]*(?<service_name>[^,]*)[^=]*[^ ]*(?<step>[^,]*)[^ ]*[^=]*[^ ]*(?<message_id>[^,]*))/
time_format %Y-%m-%d %H:%M:%S
# keep_time_key true
</source>
这是日志输出
2017-08-21 09:57:10 +0700 debug.wso2.esb: {"level":"INFO","service_name":" SA_VasGWLogSeq","step":" before vasgwInsertlog","message_id":" urn:uuid:2046f0ed-690d-47b1-aa86-d4a71c021a74"}
2017-08-21 09:57:10 +0700 debug.wso2.esb: {"level":"INFO","service_name":"","step":"","message_id":""}
2017-08-21 09:57:10 +0700 debug.wso2.esb: {"level":"INFO","service_name":" SA_VasGWLogSeq","step":" after vasgwInsertlog","message_id":" urn:uuid:2046f0ed-690d-47b1-aa86-d4a71c021a74"}
2017-08-21 10:16:10 +0700 debug.wso2.esb: {"level":"INFO","service_name":" SERVICE_NAME","step":" Before - SA_ServiceApiDSEp","message_id":" urn:uuid:39e0ecc1-dda5-4cd9-91fc-90e7ed4f5233"}
我想排除下面的行。怎么做?
{"level":"INFO","service_name":"","step":"","message_id":""}
第二个问题是当我尝试Fluentular I get a nice output without space.[=19=时为什么在值"service_name":" SERVICE_NAME"
之前有一个space ]
我通过在正则表达式中添加 space 解决了第二个任务。例如变化
[^=]*[^ ]*(?<service_name>[^,]*)
到 [^=]*[^ ]* (?<service_name>[^,]*)
.
但我不知道 如何编写过滤器来排除 key_name 具有空值的记录,例如 "service_name":""
.
因为找不到排除键为空值的记录的解决方案,所以我使用了反向解决方案。我使用 grep 来保存指定键值的记录。请参阅下面我的 Fluentd 配置。
每个 WSO2 节点上的 Fluentd。
#############################################################################################
# Fluentd Configuration File #
# #
# In v1 configuration, type and id are @ prefix parameters. #
# @type and @id are recommended. type and id are still available for backward compatibility #
#############################################################################################
################################
# Source #
################################
## built-in TCP input
## $ echo <json> | fluent-cat <tag>
<source>
@type forward
@id forward_input
port 24224
</source>
# Listen DRb for debug
<source>
@type debug_agent
@id debug_agent_input
bind 127.0.0.1
port 24230
</source>
# HTTP input
# http://localhost:8888/<tag>?json=<json>
#<source>
# @type http
# @id http_input
# port 8888
#</source>
# Listen HTTP for monitoring
# http://localhost:24220/api/plugins
# http://localhost:24220/api/plugins?type=TYPE
# http://localhost:24220/api/plugins?tag=MYTAG
<source>
@type monitor_agent
@id monitor_agent_input
port 24220
</source>
<source>
@type tail
path /opt/wso2esb-4.9.0-wkr-1/repository/logs/wso2carbon.log
pos_file /cellcard/fluent/wso2carbon.log.pos
tag wso2.esb.service.test
format /^([TID:]+ [^ ]+ [^ ]+ \[(?<time>[^\]]+)\]) ([^***]+[^=]+[^ ]+(?<transaction_id>[^,]*)[^=]+[^ ]+(?<service_name>[^,]*)[^=]+[^ ]+(?<data>[^,]*))/
time_format %Y-%m-%d %H:%M:%S
keep_time_key true
</source>
<source>
@type tail
path /opt/wso2esb-4.9.0-wkr-1/repository/logs/wso2carbon.log
pos_file /cellcard/fluent/wso2carbon.log.pos
tag wso2.esb.ne.surepay
format /^([TID:]+ [^ ]+ [^ ]+ \[(?<time>[^\]]+)\]) ([^***]+[^=]+[^ ]+(?<service_name>[^,]*)[^=]+[^ ]+(?<transaction_id>[^,]*)[^<?]+(?<payload>[^{]*))/
time_format %Y-%m-%d %H:%M:%S
keep_time_key true
</source>
<source>
@type tail
path /opt/wso2esb-4.9.0-wkr-1/repository/logs/wso2carbon.log
pos_file /cellcard/fluent/wso2carbon.log.pos
tag wso2.esb.surepay.trigger
format /^([TID:]+ [^ ]+ [^ ]+ \[(?<time>[^\]]+)\]) ([^*]+[^=]+[^ ]+(?<client_ip>[^,]*)[^=]+[^ ]+(?<service_name>[^,]*)[^=]+[^ ]+(?<req_id>[^,]*)[^=]+[^ ]+(?<content_massage>[^,]*)[^=]+[^ ]+)/
time_format %Y-%m-%d %H:%M:%S
keep_time_key true
</source>
###########################
# Filter #
###########################
<filter wso2.esb.service.**>
@type grep
<regexp>
key service_name
pattern ^\sNew
</regexp>
</filter>
<filter wso2.esb.service.**>
@type record_transformer
enable_ruby
<record>
data ${record["data"].strip.split(";").each_slice(2).to_h.to_json}
</record>
</filter>
<filter wso2.esb.service.**>
@type parser
format json
key_name data
</filter>
<filter wso2.esb.ne.surepay>
@type grep
<regexp>
key service_name
pattern ^\sNE_SurePay
</regexp>
</filter>
<filter wso2.esb.ne.surepay>
@type record_transformer
enable_ruby
<record>
service_name ${record["service_name"].strip!}
transaction_id ${record["transaction_id"].strip!}
payload ${record["payload"].strip!}
</record>
</filter>
<filter wso2.esb.surepay.trigger>
@type grep
<regexp>
key service_name
pattern ^\sSurePayPassiveTrigger
</regexp>
</filter>
<filter wso2.esb.surepay.trigger>
@type record_transformer
enable_ruby
<record>
client_ip ${record["client_ip"].strip!}
service_name ${record["service_name"].strip!}
req_id ${record["req_id"].strip!}
content_massage ${record["content_massage"].strip!}
</record>
</filter>
###########################
# Output #
###########################
## Debug
## match tag=debug.** and dump to console
<match debug.**>
@type stdout
@id stdout_output
</match>
## ESB Service Log
## match tag=wso2.esb.**. Forward to Fluentd Collector (, stdout for debug) and write to file
<match wso2.esb.**>
@type copy
<store>
@type forward
@id forward_output
buffer_path /cellcard/fluent/buffer/fluentd.forward
buffer_type file
flush_interval 10
send_timeout 60
heartbeat_type tcp
heartbeat_interval 20
<server>
host 172.16.100.243
port 24224
</server>
## If have sencondary fluentd server for fail-over, enable <secondary> block
# <secondary>
# <server>
# host 192.168.0.12
# </server>
# </secondary>
</store>
<store>
@type file
@id file_output
path /cellcard/fluent/log/wso2
time_slice_format %Y%m%d%H
time_slice_wait 10m
time_format %Y-%m-%d %H:%M:%S%z
</store>
<store>
@type stdout
</store>
</match>
Fluentd 收集器(从每个节点上的所有 fluentd 收集数据):
#############################################################################################
# Fluentd Server Configuration File #
# #
# In v1 configuration, type and id are @ prefix parameters. #
# @type and @id are recommended. type and id are still available for backward compatibility #
#############################################################################################
################################
# Source #
################################
## built-in TCP input
## $ echo <json> | fluent-cat <tag>
<source>
@type forward
@id forward_input
port 24224
</source>
# Listen DRb for debug
<source>
@type debug_agent
@id debug_agent_input
bind 127.0.0.1
port 24230
</source>
# HTTP input
# http://localhost:8888/<tag>?json=<json>
#<source>
# @type http
# @id http_input
# port 8888
#</source>
# Listen HTTP for monitoring
# http://localhost:24220/api/plugins
# http://localhost:24220/api/plugins?type=TYPE
# http://localhost:24220/api/plugins?tag=MYTAG
<source>
@type monitor_agent
@id monitor_agent_input
port 24220
</source>
###########################
# Filter #
###########################
# <filter wso2.esb.service.**>
# @type grep
# <regexp>
# key service_name
# pattern ^New
# </regexp>
# </filter>
# <filter wso2.esb.ne.surepay>
# @type grep
# <regexp>
# key service_name
# pattern ^NE_SurePay
# </regexp>
# </filter>
# <filter wso2.esb.ne.surepay>
# @type grep
# <regexp>
# key service_name
# pattern ^SurePayPassiveTrigger
# </regexp>
# </filter>
###########################
# Output #
###########################
## Debug
## match tag=debug.** and dump to console
<match debug.**>
@type stdout
@id stdout_output
</match>
## ESB Service Log
## match tag=wso2.esb.service.** and insert into database (, stdout for debug) and write to file
<match wso2.esb.**>
@type copy
<store>
@type sql
buffer_path /cellcard/fluent/buffer/fluentd.sql
buffer_type file
flush_interval 10
host {ORACLE_HOST}
port 1521
database {ORACLE_DATABASE}
adapter oracle_enhanced
username {ORACLE_USERNAME}
password {ORACLE_PADDWORD}
<table>
table {TABLE_NAME}
column_mapping 'insert_date:insert_date,transaction_id:transaction_id,service_name:service_name,process_step:process_step,msisdn:msisdn,command:command,transaction_type:transaction_type,action:action,service_price:service_price,subcriber_type:subcriber_type,transaction_status:transaction_status,notification:notification,remark:remark,vas_error_code:vas_error_code,client_username:client_username,client_ip:client_ip,api_url:api_url,api_method:api_method,nei_name:nei_name,nei_error_code:nei_error_code,server_host:server_host'
# This is the default table because it has no "pattern" argument in <table>
# The logic is such that if all non-default <table> blocks
# do not match, the default one is chosen.
# The default table is required.
</table>
<table wso2.esb.service.test>
table {TABLE_NAME}
column_mapping 'insert_date:insert_date,transaction_id:transaction_id,service_name:service_name,process_step:process_step,msisdn:msisdn,command:command,transaction_type:transaction_type,action:action,service_price:service_price,subcriber_type:subcriber_type,transaction_status:transaction_status,notification:notification,remark:remark,vas_error_code:vas_error_code,client_username:client_username,client_ip:client_ip,api_url:api_url,api_method:api_method,nei_name:nei_name,nei_error_code:nei_error_code,server_host:server_host'
</table>
<table wso2.esb.ne.surepay>
table {TABLE_NAME}
column_mapping 'time:insert_date,transaction_id:transaction_id,service_name:service_name,payload:payload'
</table>
<table wso2.esb.surepay.trigger>
table {TABLE_NAME}
column_mapping 'time:insert_date,client_ip:client_ip,service_name:service_name,req_id:req_id,content_massage:content_massage'
</table>
</store>
<store>
@type file
path /cellcard/fluent/log/service
time_slice_format %Y%m%d%H
time_slice_wait 10m
time_format %Y-%m-%d %H:%M:%S%z
</store>
<store>
@type stdout
</store>
</match>
注意:我使用 frontd 从 WSO2 跟踪日志,然后插入到 Oracle 数据库中。
平台: RedHat 7,ruby 2.4.1p111,流利的 0.12.40,activerecord-oracle_enhanced-adapter (1.8.2),ruby -oci8 (2.2.5), fluent-plugin-sql (0.6.1).
更新
我已经在 GitHub 上发布了所有配置和安装细节
https://github.com/oemdaro/fluent-oracle-example
这似乎是对 grep
过滤器插件的 exclude
指令的相当简单的使用。
使用"start"(^
)匹配一个空消息并排除它,然后没有任何内容并结束($
)可以通过以下方式完成。
<filter **>
@type grep
<exclude>
key service_name
pattern /^$/
# or, to exclude all messages that are empty or include only white-space:
# pattern /^\s*$/
</exclude>
</filter>
请注意,0.12 和 1.x 之间的正则表达式符号发生了变化(现在使用前导和尾随斜线)。
我想排除 serive_name 为空的行 "service_name":""
。
这是我的流利的 conf
## match tag=debug.** and dump to console
<match debug.**>
@type stdout
</match>
<source>
@type tail
path /opt/wso2esb-4.9.0-wkr-1/repository/logs/wso2carbon.log
pos_file /var/log/td-agent/tmp/wso2carbon.log.pos
tag debug.wso2.esb
format /^([TID:]* [^ ]* [^ ]* \[(?<time>[^\]]*)\]) ([^ ]* (?<level>[^ ]*))([^***]*[^=]*[^ ]*(?<service_name>[^,]*)[^=]*[^ ]*(?<step>[^,]*)[^ ]*[^=]*[^ ]*(?<message_id>[^,]*))/
time_format %Y-%m-%d %H:%M:%S
# keep_time_key true
</source>
这是日志输出
2017-08-21 09:57:10 +0700 debug.wso2.esb: {"level":"INFO","service_name":" SA_VasGWLogSeq","step":" before vasgwInsertlog","message_id":" urn:uuid:2046f0ed-690d-47b1-aa86-d4a71c021a74"}
2017-08-21 09:57:10 +0700 debug.wso2.esb: {"level":"INFO","service_name":"","step":"","message_id":""}
2017-08-21 09:57:10 +0700 debug.wso2.esb: {"level":"INFO","service_name":" SA_VasGWLogSeq","step":" after vasgwInsertlog","message_id":" urn:uuid:2046f0ed-690d-47b1-aa86-d4a71c021a74"}
2017-08-21 10:16:10 +0700 debug.wso2.esb: {"level":"INFO","service_name":" SERVICE_NAME","step":" Before - SA_ServiceApiDSEp","message_id":" urn:uuid:39e0ecc1-dda5-4cd9-91fc-90e7ed4f5233"}
我想排除下面的行。怎么做?
{"level":"INFO","service_name":"","step":"","message_id":""}
第二个问题是当我尝试Fluentular I get a nice output without space.[=19=时为什么在值"service_name":" SERVICE_NAME"
之前有一个space ]
我通过在正则表达式中添加 space 解决了第二个任务。例如变化
[^=]*[^ ]*(?<service_name>[^,]*)
到 [^=]*[^ ]* (?<service_name>[^,]*)
.
但我不知道 如何编写过滤器来排除 key_name 具有空值的记录,例如 "service_name":""
.
因为找不到排除键为空值的记录的解决方案,所以我使用了反向解决方案。我使用 grep 来保存指定键值的记录。请参阅下面我的 Fluentd 配置。
每个 WSO2 节点上的 Fluentd。
#############################################################################################
# Fluentd Configuration File #
# #
# In v1 configuration, type and id are @ prefix parameters. #
# @type and @id are recommended. type and id are still available for backward compatibility #
#############################################################################################
################################
# Source #
################################
## built-in TCP input
## $ echo <json> | fluent-cat <tag>
<source>
@type forward
@id forward_input
port 24224
</source>
# Listen DRb for debug
<source>
@type debug_agent
@id debug_agent_input
bind 127.0.0.1
port 24230
</source>
# HTTP input
# http://localhost:8888/<tag>?json=<json>
#<source>
# @type http
# @id http_input
# port 8888
#</source>
# Listen HTTP for monitoring
# http://localhost:24220/api/plugins
# http://localhost:24220/api/plugins?type=TYPE
# http://localhost:24220/api/plugins?tag=MYTAG
<source>
@type monitor_agent
@id monitor_agent_input
port 24220
</source>
<source>
@type tail
path /opt/wso2esb-4.9.0-wkr-1/repository/logs/wso2carbon.log
pos_file /cellcard/fluent/wso2carbon.log.pos
tag wso2.esb.service.test
format /^([TID:]+ [^ ]+ [^ ]+ \[(?<time>[^\]]+)\]) ([^***]+[^=]+[^ ]+(?<transaction_id>[^,]*)[^=]+[^ ]+(?<service_name>[^,]*)[^=]+[^ ]+(?<data>[^,]*))/
time_format %Y-%m-%d %H:%M:%S
keep_time_key true
</source>
<source>
@type tail
path /opt/wso2esb-4.9.0-wkr-1/repository/logs/wso2carbon.log
pos_file /cellcard/fluent/wso2carbon.log.pos
tag wso2.esb.ne.surepay
format /^([TID:]+ [^ ]+ [^ ]+ \[(?<time>[^\]]+)\]) ([^***]+[^=]+[^ ]+(?<service_name>[^,]*)[^=]+[^ ]+(?<transaction_id>[^,]*)[^<?]+(?<payload>[^{]*))/
time_format %Y-%m-%d %H:%M:%S
keep_time_key true
</source>
<source>
@type tail
path /opt/wso2esb-4.9.0-wkr-1/repository/logs/wso2carbon.log
pos_file /cellcard/fluent/wso2carbon.log.pos
tag wso2.esb.surepay.trigger
format /^([TID:]+ [^ ]+ [^ ]+ \[(?<time>[^\]]+)\]) ([^*]+[^=]+[^ ]+(?<client_ip>[^,]*)[^=]+[^ ]+(?<service_name>[^,]*)[^=]+[^ ]+(?<req_id>[^,]*)[^=]+[^ ]+(?<content_massage>[^,]*)[^=]+[^ ]+)/
time_format %Y-%m-%d %H:%M:%S
keep_time_key true
</source>
###########################
# Filter #
###########################
<filter wso2.esb.service.**>
@type grep
<regexp>
key service_name
pattern ^\sNew
</regexp>
</filter>
<filter wso2.esb.service.**>
@type record_transformer
enable_ruby
<record>
data ${record["data"].strip.split(";").each_slice(2).to_h.to_json}
</record>
</filter>
<filter wso2.esb.service.**>
@type parser
format json
key_name data
</filter>
<filter wso2.esb.ne.surepay>
@type grep
<regexp>
key service_name
pattern ^\sNE_SurePay
</regexp>
</filter>
<filter wso2.esb.ne.surepay>
@type record_transformer
enable_ruby
<record>
service_name ${record["service_name"].strip!}
transaction_id ${record["transaction_id"].strip!}
payload ${record["payload"].strip!}
</record>
</filter>
<filter wso2.esb.surepay.trigger>
@type grep
<regexp>
key service_name
pattern ^\sSurePayPassiveTrigger
</regexp>
</filter>
<filter wso2.esb.surepay.trigger>
@type record_transformer
enable_ruby
<record>
client_ip ${record["client_ip"].strip!}
service_name ${record["service_name"].strip!}
req_id ${record["req_id"].strip!}
content_massage ${record["content_massage"].strip!}
</record>
</filter>
###########################
# Output #
###########################
## Debug
## match tag=debug.** and dump to console
<match debug.**>
@type stdout
@id stdout_output
</match>
## ESB Service Log
## match tag=wso2.esb.**. Forward to Fluentd Collector (, stdout for debug) and write to file
<match wso2.esb.**>
@type copy
<store>
@type forward
@id forward_output
buffer_path /cellcard/fluent/buffer/fluentd.forward
buffer_type file
flush_interval 10
send_timeout 60
heartbeat_type tcp
heartbeat_interval 20
<server>
host 172.16.100.243
port 24224
</server>
## If have sencondary fluentd server for fail-over, enable <secondary> block
# <secondary>
# <server>
# host 192.168.0.12
# </server>
# </secondary>
</store>
<store>
@type file
@id file_output
path /cellcard/fluent/log/wso2
time_slice_format %Y%m%d%H
time_slice_wait 10m
time_format %Y-%m-%d %H:%M:%S%z
</store>
<store>
@type stdout
</store>
</match>
Fluentd 收集器(从每个节点上的所有 fluentd 收集数据):
#############################################################################################
# Fluentd Server Configuration File #
# #
# In v1 configuration, type and id are @ prefix parameters. #
# @type and @id are recommended. type and id are still available for backward compatibility #
#############################################################################################
################################
# Source #
################################
## built-in TCP input
## $ echo <json> | fluent-cat <tag>
<source>
@type forward
@id forward_input
port 24224
</source>
# Listen DRb for debug
<source>
@type debug_agent
@id debug_agent_input
bind 127.0.0.1
port 24230
</source>
# HTTP input
# http://localhost:8888/<tag>?json=<json>
#<source>
# @type http
# @id http_input
# port 8888
#</source>
# Listen HTTP for monitoring
# http://localhost:24220/api/plugins
# http://localhost:24220/api/plugins?type=TYPE
# http://localhost:24220/api/plugins?tag=MYTAG
<source>
@type monitor_agent
@id monitor_agent_input
port 24220
</source>
###########################
# Filter #
###########################
# <filter wso2.esb.service.**>
# @type grep
# <regexp>
# key service_name
# pattern ^New
# </regexp>
# </filter>
# <filter wso2.esb.ne.surepay>
# @type grep
# <regexp>
# key service_name
# pattern ^NE_SurePay
# </regexp>
# </filter>
# <filter wso2.esb.ne.surepay>
# @type grep
# <regexp>
# key service_name
# pattern ^SurePayPassiveTrigger
# </regexp>
# </filter>
###########################
# Output #
###########################
## Debug
## match tag=debug.** and dump to console
<match debug.**>
@type stdout
@id stdout_output
</match>
## ESB Service Log
## match tag=wso2.esb.service.** and insert into database (, stdout for debug) and write to file
<match wso2.esb.**>
@type copy
<store>
@type sql
buffer_path /cellcard/fluent/buffer/fluentd.sql
buffer_type file
flush_interval 10
host {ORACLE_HOST}
port 1521
database {ORACLE_DATABASE}
adapter oracle_enhanced
username {ORACLE_USERNAME}
password {ORACLE_PADDWORD}
<table>
table {TABLE_NAME}
column_mapping 'insert_date:insert_date,transaction_id:transaction_id,service_name:service_name,process_step:process_step,msisdn:msisdn,command:command,transaction_type:transaction_type,action:action,service_price:service_price,subcriber_type:subcriber_type,transaction_status:transaction_status,notification:notification,remark:remark,vas_error_code:vas_error_code,client_username:client_username,client_ip:client_ip,api_url:api_url,api_method:api_method,nei_name:nei_name,nei_error_code:nei_error_code,server_host:server_host'
# This is the default table because it has no "pattern" argument in <table>
# The logic is such that if all non-default <table> blocks
# do not match, the default one is chosen.
# The default table is required.
</table>
<table wso2.esb.service.test>
table {TABLE_NAME}
column_mapping 'insert_date:insert_date,transaction_id:transaction_id,service_name:service_name,process_step:process_step,msisdn:msisdn,command:command,transaction_type:transaction_type,action:action,service_price:service_price,subcriber_type:subcriber_type,transaction_status:transaction_status,notification:notification,remark:remark,vas_error_code:vas_error_code,client_username:client_username,client_ip:client_ip,api_url:api_url,api_method:api_method,nei_name:nei_name,nei_error_code:nei_error_code,server_host:server_host'
</table>
<table wso2.esb.ne.surepay>
table {TABLE_NAME}
column_mapping 'time:insert_date,transaction_id:transaction_id,service_name:service_name,payload:payload'
</table>
<table wso2.esb.surepay.trigger>
table {TABLE_NAME}
column_mapping 'time:insert_date,client_ip:client_ip,service_name:service_name,req_id:req_id,content_massage:content_massage'
</table>
</store>
<store>
@type file
path /cellcard/fluent/log/service
time_slice_format %Y%m%d%H
time_slice_wait 10m
time_format %Y-%m-%d %H:%M:%S%z
</store>
<store>
@type stdout
</store>
</match>
注意:我使用 frontd 从 WSO2 跟踪日志,然后插入到 Oracle 数据库中。
平台: RedHat 7,ruby 2.4.1p111,流利的 0.12.40,activerecord-oracle_enhanced-adapter (1.8.2),ruby -oci8 (2.2.5), fluent-plugin-sql (0.6.1).
更新 我已经在 GitHub 上发布了所有配置和安装细节 https://github.com/oemdaro/fluent-oracle-example
这似乎是对 grep
过滤器插件的 exclude
指令的相当简单的使用。
使用"start"(^
)匹配一个空消息并排除它,然后没有任何内容并结束($
)可以通过以下方式完成。
<filter **>
@type grep
<exclude>
key service_name
pattern /^$/
# or, to exclude all messages that are empty or include only white-space:
# pattern /^\s*$/
</exclude>
</filter>
请注意,0.12 和 1.x 之间的正则表达式符号发生了变化(现在使用前导和尾随斜线)。