rabbitmq / logstash 丢失消息
rabbitmq / logstash lost message
我有一个可以成功存储消息的 rabbitmq,但是我读取队列的 logstash 忽略了我的大部分消息。
RabbitMQ 没问题,我有一个小的python脚本来显示所有消息
import pika
i=0
def on_message(channel, method_frame, header_frame, body):
global i
print i
print("Message body", body)
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
i+=1
credentials = pika.PlainCredentials('***', '***')
parameters = pika.ConnectionParameters('***',5672,'logstash', credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.exchange_declare(exchange="logstash", exchange_type="topic", passive=False, durable=True, auto_delete=False)
channel.queue_declare(queue="hbbtv", auto_delete=False, durable=True)
channel.queue_bind(queue="hbbtv", exchange="logstash", routing_key="hbbtv")
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_message, 'hbbtv')
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
connection.close()
我可以看到我所有的消息
12 ('Message body', '{"message":"212.95.70.118 - -
[25/Feb/2016:11:19:53 +0100] \"GET
/services/web/index.php/OPA/categories/ARTEPLUS7/fr HTTP/1.1\" 200
348
\"http://www.arte.tv/hbbtvv2/notv/cehtml/index.cehtml?lang=de_DE&page=PLUS7&tv=false\"
\"Opera/9.80 (Linux armv7l; HbbTV/1.1.1 (; Philips; ; ; PhilipsTV; )
CE-HTML/1.0 NETTV/4.3.1 PhilipsTV/2.1.1 Firmware/003.015.000.001
(PhilipsTV, 2.1.1,) en) Presto/2.12.362 Version/12.11 \"
hbbtvdyn.arte.tv","@version":"1","@timestamp":"2016-02-25T10:19:53.000Z","path":"/data/logs/access","host":"arte-hbbtvdyn-web1.sdv.fr","type":"apache-access","application":"hbbtv","clientip":"212.95.70.118","ident":"-","auth":"-","timestamp":"25/Feb/2016:11:19:53 +0100","verb":"GET","request":"/services/web/index.php/OPA/categories/ARTEPLUS7/fr","httpversion":"1.1","response":"200","bytes":"348","referrer":"\"http://www.arte.tv/hbbtvv2/notv/cehtml/index.cehtml?lang=de_DE&page=PLUS7&tv=false\"","agent":"\"Opera/9.80
(Linux armv7l; HbbTV/1.1.1 (; Philips; ; ; PhilipsTV; ) CE-HTML/1.0
NETTV/4.3.1 PhilipsTV/2.1.1 Firmware/003.015.000.001 (PhilipsTV,
2.1.1,) en) Presto/2.12.362 Version/12.11 \"","targethost":"hbbtvdyn.arte.tv","geoip":{"ip":"212.95.70.118","country_code2":"FR","country_code3":"FRA","country_name":"France","continent_code":"EU","region_name":"C1","city_name":"Strasbourg","latitude":48.60040000000001,"longitude":7.787399999999991,"timezone":"Europe/Paris","real_region_name":"Alsace","location":[7.787399999999991,48.60040000000001]}}')
13 ('Message body', '{"message":"212.95.70.118 - -
[25/Feb/2016:11:19:53 +0100] \"GET
/services/web/index.php/OPA/videos/highlights/6/ARTEPLUS7/de/GE
HTTP/1.1\" 500 4519
\"http://www.arte.tv/hbbtvv2/notv/cehtml/index.cehtml?lang=de_DE&page=PLUS7&tv=false\"
\"Opera/9.80 (Linux armv7l; HbbTV/1.1.1 (; Philips; ; ; PhilipsTV; )
CE-HTML/1.0 NETTV/4.3.1 PhilipsTV/2.1.1 Firmware/003.015.000.001
(PhilipsTV, 2.1.1,) en) Presto/2.12.362 Version/12.11 \"
hbbtvdyn.arte.tv","@version":"1","@timestamp":"2016-02-25T10:19:53.000Z","path":"/data/logs/access","host":"arte-hbbtvdyn-web1.sdv.fr","type":"apache-access","application":"hbbtv","clientip":"212.95.70.118","ident":"-","auth":"-","timestamp":"25/Feb/2016:11:19:53 +0100","verb":"GET","request":"/services/web/index.php/OPA/videos/highlights/6/ARTEPLUS7/de/GE","httpversion":"1.1","response":"500","bytes":"4519","referrer":"\"http://www.arte.tv/hbbtvv2/notv/cehtml/index.cehtml?lang=de_DE&page=PLUS7&tv=false\"","agent":"\"Opera/9.80
(Linux armv7l; HbbTV/1.1.1 (; Philips; ; ; PhilipsTV; ) CE-HTML/1.0
NETTV/4.3.1 PhilipsTV/2.1.1 Firmware/003.015.000.001 (PhilipsTV,
2.1.1,) en) Presto/2.12.362 Version/12.11 \"","targethost":"hbbtvdyn.arte.tv","geoip":{"ip":"212.95.70.118","country_code2":"FR","country_code3":"FRA","country_name":"France","continent_code":"EU","region_name":"C1","city_name":"Strasbourg","latitude":48.60040000000001,"longitude":7.787399999999991,"timezone":"Europe/Paris","real_region_name":"Alsace","location":[7.787399999999991,48.60040000000001]}}')
14 ('Message body', '{"message":"212.95.70.119 - -
[25/Feb/2016:11:19:53 +0100] \"GET
/OPA/getOPAData.php?url=videoStreams%3Flanguage%3Dfr%26protocol%3DHTTP%26mediaType%3Dmp4%26quality%3DEQ%2CSQ%2CHQ%26profileAmm%3D%24nin%3AAMM-YTFR-HAB%2CAMM-YTFR%2CAMM-DT%26kind%3DSHOW%26availableScreens%3Dtv%26fields%3DprogramId%2Curl%2Cquality%2CaudioSlot%2CaudioCode%2CaudioLabel%2CaudioShortLabel%2Cchannel%26programId%3D048353-033-A%26platform%3DARTEPLUS7&filename=PLUS7_stream_048353-033-A_fr_FR.json
HTTP/1.1\" 200 5508 \"-\" \"Mozilla/5.0 (Linux; Tizen 2.3;
SmartHub; SMART-TV; SmartTV; U; Maple2012) AppleWebKit/538.1+ (KHTML,
like Gecko) TV Safari/538.1+ \"
hbbtvdyn.arte.tv","@version":"1","@timestamp":"2016-02-25T10:19:53.000Z","path":"/data/logs/access","host":"arte-hbbtvdyn-web1.sdv.fr","type":"apache-access","application":"hbbtv","clientip":"212.95.70.119","ident":"-","auth":"-","timestamp":"25/Feb/2016:11:19:53 +0100","verb":"GET","request":"/OPA/getOPAData.php?url=videoStreams%3Flanguage%3Dfr%26protocol%3DHTTP%26mediaType%3Dmp4%26quality%3DEQ%2CSQ%2CHQ%26profileAmm%3D%24nin%3AAMM-YTFR-HAB%2CAMM-YTFR%2CAMM-DT%26kind%3DSHOW%26availableScreens%3Dtv%26fields%3DprogramId%2Curl%2Cquality%2CaudioSlot%2CaudioCode%2CaudioLabel%2CaudioShortLabel%2Cchannel%26programId%3D048353-033-A%26platform%3DARTEPLUS7&filename=PLUS7_stream_048353-033-A_fr_FR.json","httpversion":"1.1","response":"200","bytes":"5508","referrer":"\"-\"","agent":"\"Mozilla/5.0
(Linux; Tizen 2.3; SmartHub; SMART-TV; SmartTV; U; Maple2012)
AppleWebKit/538.1+ (KHTML, like Gecko) TV Safari/538.1+
\"","targethost":"hbbtvdyn.arte.tv","geoip":{"ip":"212.95.70.119","country_code2":"FR","country_code3":"FRA","country_name":"France","continent_code":"EU","region_name":"C1","city_name":"Strasbourg","latitude":48.60040000000001,"longitude":7.787399999999991,"timezone":"Europe/Paris","real_region_name":"Alsace","location":[7.787399999999991,48.60040000000001]}}')
使用良好的速率消息(每秒几次)并且我绝对没有 grok 解析失败。
所以问题发生在 logstash 读取消息时。问题是
- 大量信息丢失
- 所有消息都有 _grokparsefailure 即使
完成了
logstash的输入部分是
rabbitmq {
host=>"arte-elasticlog.sdv.fr"
user=>"***"
password=>"***"
queue=>"hbbtv"
vhost=>"logstash"
port=>5672
auto_delete=>false
durable=>true
type => "rabbit_hbbtv"
}
_grokparsefailure 表示无法解析消息。表示消息已成功从队列中读取,但您的 grok 表达式有问题或无法应用于您的消息内容。
还有一点是rabbitmq输入的默认编解码器是"json",如果你的rabbitmq消息内容不是json,你应该将输入的编解码器设置为例如:
编解码器 => 普通{}
问题出在我的 logstash 过滤器上,
我有两个 apache 访问权限,但模式不同,所以当 logstash 尝试解析消息时,有时他有很好的模式 --> 在 ES 中,有时没有 --> 没有消息。
现在,对于我所有不同的日志,我添加(添加字段)
application-->"my application name"
在我的输入中,我所有的 grok 过滤器都取决于应用程序。
现在一切都很好,感谢您的帮助。
我有一个可以成功存储消息的 rabbitmq,但是我读取队列的 logstash 忽略了我的大部分消息。
RabbitMQ 没问题,我有一个小的python脚本来显示所有消息
import pika
i=0
def on_message(channel, method_frame, header_frame, body):
global i
print i
print("Message body", body)
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
i+=1
credentials = pika.PlainCredentials('***', '***')
parameters = pika.ConnectionParameters('***',5672,'logstash', credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.exchange_declare(exchange="logstash", exchange_type="topic", passive=False, durable=True, auto_delete=False)
channel.queue_declare(queue="hbbtv", auto_delete=False, durable=True)
channel.queue_bind(queue="hbbtv", exchange="logstash", routing_key="hbbtv")
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_message, 'hbbtv')
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
connection.close()
我可以看到我所有的消息
12 ('Message body', '{"message":"212.95.70.118 - - [25/Feb/2016:11:19:53 +0100] \"GET /services/web/index.php/OPA/categories/ARTEPLUS7/fr HTTP/1.1\" 200 348 \"http://www.arte.tv/hbbtvv2/notv/cehtml/index.cehtml?lang=de_DE&page=PLUS7&tv=false\" \"Opera/9.80 (Linux armv7l; HbbTV/1.1.1 (; Philips; ; ; PhilipsTV; ) CE-HTML/1.0 NETTV/4.3.1 PhilipsTV/2.1.1 Firmware/003.015.000.001 (PhilipsTV, 2.1.1,) en) Presto/2.12.362 Version/12.11 \" hbbtvdyn.arte.tv","@version":"1","@timestamp":"2016-02-25T10:19:53.000Z","path":"/data/logs/access","host":"arte-hbbtvdyn-web1.sdv.fr","type":"apache-access","application":"hbbtv","clientip":"212.95.70.118","ident":"-","auth":"-","timestamp":"25/Feb/2016:11:19:53 +0100","verb":"GET","request":"/services/web/index.php/OPA/categories/ARTEPLUS7/fr","httpversion":"1.1","response":"200","bytes":"348","referrer":"\"http://www.arte.tv/hbbtvv2/notv/cehtml/index.cehtml?lang=de_DE&page=PLUS7&tv=false\"","agent":"\"Opera/9.80 (Linux armv7l; HbbTV/1.1.1 (; Philips; ; ; PhilipsTV; ) CE-HTML/1.0 NETTV/4.3.1 PhilipsTV/2.1.1 Firmware/003.015.000.001 (PhilipsTV, 2.1.1,) en) Presto/2.12.362 Version/12.11 \"","targethost":"hbbtvdyn.arte.tv","geoip":{"ip":"212.95.70.118","country_code2":"FR","country_code3":"FRA","country_name":"France","continent_code":"EU","region_name":"C1","city_name":"Strasbourg","latitude":48.60040000000001,"longitude":7.787399999999991,"timezone":"Europe/Paris","real_region_name":"Alsace","location":[7.787399999999991,48.60040000000001]}}') 13 ('Message body', '{"message":"212.95.70.118 - - [25/Feb/2016:11:19:53 +0100] \"GET /services/web/index.php/OPA/videos/highlights/6/ARTEPLUS7/de/GE HTTP/1.1\" 500 4519 \"http://www.arte.tv/hbbtvv2/notv/cehtml/index.cehtml?lang=de_DE&page=PLUS7&tv=false\" \"Opera/9.80 (Linux armv7l; HbbTV/1.1.1 (; Philips; ; ; PhilipsTV; ) CE-HTML/1.0 NETTV/4.3.1 PhilipsTV/2.1.1 Firmware/003.015.000.001 (PhilipsTV, 2.1.1,) en) Presto/2.12.362 Version/12.11 \" hbbtvdyn.arte.tv","@version":"1","@timestamp":"2016-02-25T10:19:53.000Z","path":"/data/logs/access","host":"arte-hbbtvdyn-web1.sdv.fr","type":"apache-access","application":"hbbtv","clientip":"212.95.70.118","ident":"-","auth":"-","timestamp":"25/Feb/2016:11:19:53 +0100","verb":"GET","request":"/services/web/index.php/OPA/videos/highlights/6/ARTEPLUS7/de/GE","httpversion":"1.1","response":"500","bytes":"4519","referrer":"\"http://www.arte.tv/hbbtvv2/notv/cehtml/index.cehtml?lang=de_DE&page=PLUS7&tv=false\"","agent":"\"Opera/9.80 (Linux armv7l; HbbTV/1.1.1 (; Philips; ; ; PhilipsTV; ) CE-HTML/1.0 NETTV/4.3.1 PhilipsTV/2.1.1 Firmware/003.015.000.001 (PhilipsTV, 2.1.1,) en) Presto/2.12.362 Version/12.11 \"","targethost":"hbbtvdyn.arte.tv","geoip":{"ip":"212.95.70.118","country_code2":"FR","country_code3":"FRA","country_name":"France","continent_code":"EU","region_name":"C1","city_name":"Strasbourg","latitude":48.60040000000001,"longitude":7.787399999999991,"timezone":"Europe/Paris","real_region_name":"Alsace","location":[7.787399999999991,48.60040000000001]}}') 14 ('Message body', '{"message":"212.95.70.119 - - [25/Feb/2016:11:19:53 +0100] \"GET /OPA/getOPAData.php?url=videoStreams%3Flanguage%3Dfr%26protocol%3DHTTP%26mediaType%3Dmp4%26quality%3DEQ%2CSQ%2CHQ%26profileAmm%3D%24nin%3AAMM-YTFR-HAB%2CAMM-YTFR%2CAMM-DT%26kind%3DSHOW%26availableScreens%3Dtv%26fields%3DprogramId%2Curl%2Cquality%2CaudioSlot%2CaudioCode%2CaudioLabel%2CaudioShortLabel%2Cchannel%26programId%3D048353-033-A%26platform%3DARTEPLUS7&filename=PLUS7_stream_048353-033-A_fr_FR.json HTTP/1.1\" 200 5508 \"-\" \"Mozilla/5.0 (Linux; Tizen 2.3; SmartHub; SMART-TV; SmartTV; U; Maple2012) AppleWebKit/538.1+ (KHTML, like Gecko) TV Safari/538.1+ \" hbbtvdyn.arte.tv","@version":"1","@timestamp":"2016-02-25T10:19:53.000Z","path":"/data/logs/access","host":"arte-hbbtvdyn-web1.sdv.fr","type":"apache-access","application":"hbbtv","clientip":"212.95.70.119","ident":"-","auth":"-","timestamp":"25/Feb/2016:11:19:53 +0100","verb":"GET","request":"/OPA/getOPAData.php?url=videoStreams%3Flanguage%3Dfr%26protocol%3DHTTP%26mediaType%3Dmp4%26quality%3DEQ%2CSQ%2CHQ%26profileAmm%3D%24nin%3AAMM-YTFR-HAB%2CAMM-YTFR%2CAMM-DT%26kind%3DSHOW%26availableScreens%3Dtv%26fields%3DprogramId%2Curl%2Cquality%2CaudioSlot%2CaudioCode%2CaudioLabel%2CaudioShortLabel%2Cchannel%26programId%3D048353-033-A%26platform%3DARTEPLUS7&filename=PLUS7_stream_048353-033-A_fr_FR.json","httpversion":"1.1","response":"200","bytes":"5508","referrer":"\"-\"","agent":"\"Mozilla/5.0 (Linux; Tizen 2.3; SmartHub; SMART-TV; SmartTV; U; Maple2012) AppleWebKit/538.1+ (KHTML, like Gecko) TV Safari/538.1+ \"","targethost":"hbbtvdyn.arte.tv","geoip":{"ip":"212.95.70.119","country_code2":"FR","country_code3":"FRA","country_name":"France","continent_code":"EU","region_name":"C1","city_name":"Strasbourg","latitude":48.60040000000001,"longitude":7.787399999999991,"timezone":"Europe/Paris","real_region_name":"Alsace","location":[7.787399999999991,48.60040000000001]}}')
使用良好的速率消息(每秒几次)并且我绝对没有 grok 解析失败。
所以问题发生在 logstash 读取消息时。问题是
- 大量信息丢失
- 所有消息都有 _grokparsefailure 即使 完成了
logstash的输入部分是
rabbitmq {
host=>"arte-elasticlog.sdv.fr"
user=>"***"
password=>"***"
queue=>"hbbtv"
vhost=>"logstash"
port=>5672
auto_delete=>false
durable=>true
type => "rabbit_hbbtv"
}
_grokparsefailure 表示无法解析消息。表示消息已成功从队列中读取,但您的 grok 表达式有问题或无法应用于您的消息内容。
还有一点是rabbitmq输入的默认编解码器是"json",如果你的rabbitmq消息内容不是json,你应该将输入的编解码器设置为例如:
编解码器 => 普通{}
问题出在我的 logstash 过滤器上, 我有两个 apache 访问权限,但模式不同,所以当 logstash 尝试解析消息时,有时他有很好的模式 --> 在 ES 中,有时没有 --> 没有消息。
现在,对于我所有不同的日志,我添加(添加字段)
application-->"my application name"
在我的输入中,我所有的 grok 过滤器都取决于应用程序。
现在一切都很好,感谢您的帮助。