rabbitmq / logstash 丢失消息

rabbitmq / logstash lost message

我有一个可以成功存储消息的 rabbitmq,但是我读取队列的 logstash 忽略了我的大部分消息。

RabbitMQ 没问题,我有一个小的python脚本来显示所有消息

import pika
i=0
def on_message(channel, method_frame, header_frame, body):
    global i
    print i
    print("Message body", body)
    channel.basic_ack(delivery_tag=method_frame.delivery_tag)
    i+=1

credentials = pika.PlainCredentials('***', '***')
parameters =  pika.ConnectionParameters('***',5672,'logstash', credentials=credentials)
connection = pika.BlockingConnection(parameters)

channel = connection.channel()
channel.exchange_declare(exchange="logstash", exchange_type="topic", passive=False, durable=True, auto_delete=False)
channel.queue_declare(queue="hbbtv", auto_delete=False, durable=True)
channel.queue_bind(queue="hbbtv", exchange="logstash", routing_key="hbbtv")
channel.basic_qos(prefetch_count=1)

channel.basic_consume(on_message, 'hbbtv')

try:
    channel.start_consuming()
except KeyboardInterrupt:
    channel.stop_consuming()

connection.close()

我可以看到我所有的消息

12 ('Message body', '{"message":"212.95.70.118 - - [25/Feb/2016:11:19:53 +0100] \"GET /services/web/index.php/OPA/categories/ARTEPLUS7/fr HTTP/1.1\" 200 348 \"http://www.arte.tv/hbbtvv2/notv/cehtml/index.cehtml?lang=de_DE&page=PLUS7&tv=false\" \"Opera/9.80 (Linux armv7l; HbbTV/1.1.1 (; Philips; ; ; PhilipsTV; ) CE-HTML/1.0 NETTV/4.3.1 PhilipsTV/2.1.1 Firmware/003.015.000.001 (PhilipsTV, 2.1.1,) en) Presto/2.12.362 Version/12.11 \" hbbtvdyn.arte.tv","@version":"1","@timestamp":"2016-02-25T10:19:53.000Z","path":"/data/logs/access","host":"arte-hbbtvdyn-web1.sdv.fr","type":"apache-access","application":"hbbtv","clientip":"212.95.70.118","ident":"-","auth":"-","timestamp":"25/Feb/2016:11:19:53 +0100","verb":"GET","request":"/services/web/index.php/OPA/categories/ARTEPLUS7/fr","httpversion":"1.1","response":"200","bytes":"348","referrer":"\"http://www.arte.tv/hbbtvv2/notv/cehtml/index.cehtml?lang=de_DE&page=PLUS7&tv=false\"","agent":"\"Opera/9.80 (Linux armv7l; HbbTV/1.1.1 (; Philips; ; ; PhilipsTV; ) CE-HTML/1.0 NETTV/4.3.1 PhilipsTV/2.1.1 Firmware/003.015.000.001 (PhilipsTV, 2.1.1,) en) Presto/2.12.362 Version/12.11 \"","targethost":"hbbtvdyn.arte.tv","geoip":{"ip":"212.95.70.118","country_code2":"FR","country_code3":"FRA","country_name":"France","continent_code":"EU","region_name":"C1","city_name":"Strasbourg","latitude":48.60040000000001,"longitude":7.787399999999991,"timezone":"Europe/Paris","real_region_name":"Alsace","location":[7.787399999999991,48.60040000000001]}}') 13 ('Message body', '{"message":"212.95.70.118 - - [25/Feb/2016:11:19:53 +0100] \"GET /services/web/index.php/OPA/videos/highlights/6/ARTEPLUS7/de/GE HTTP/1.1\" 500 4519 \"http://www.arte.tv/hbbtvv2/notv/cehtml/index.cehtml?lang=de_DE&page=PLUS7&tv=false\" \"Opera/9.80 (Linux armv7l; HbbTV/1.1.1 (; Philips; ; ; PhilipsTV; ) CE-HTML/1.0 NETTV/4.3.1 PhilipsTV/2.1.1 Firmware/003.015.000.001 (PhilipsTV, 2.1.1,) en) Presto/2.12.362 Version/12.11 \" hbbtvdyn.arte.tv","@version":"1","@timestamp":"2016-02-25T10:19:53.000Z","path":"/data/logs/access","host":"arte-hbbtvdyn-web1.sdv.fr","type":"apache-access","application":"hbbtv","clientip":"212.95.70.118","ident":"-","auth":"-","timestamp":"25/Feb/2016:11:19:53 +0100","verb":"GET","request":"/services/web/index.php/OPA/videos/highlights/6/ARTEPLUS7/de/GE","httpversion":"1.1","response":"500","bytes":"4519","referrer":"\"http://www.arte.tv/hbbtvv2/notv/cehtml/index.cehtml?lang=de_DE&page=PLUS7&tv=false\"","agent":"\"Opera/9.80 (Linux armv7l; HbbTV/1.1.1 (; Philips; ; ; PhilipsTV; ) CE-HTML/1.0 NETTV/4.3.1 PhilipsTV/2.1.1 Firmware/003.015.000.001 (PhilipsTV, 2.1.1,) en) Presto/2.12.362 Version/12.11 \"","targethost":"hbbtvdyn.arte.tv","geoip":{"ip":"212.95.70.118","country_code2":"FR","country_code3":"FRA","country_name":"France","continent_code":"EU","region_name":"C1","city_name":"Strasbourg","latitude":48.60040000000001,"longitude":7.787399999999991,"timezone":"Europe/Paris","real_region_name":"Alsace","location":[7.787399999999991,48.60040000000001]}}') 14 ('Message body', '{"message":"212.95.70.119 - - [25/Feb/2016:11:19:53 +0100] \"GET /OPA/getOPAData.php?url=videoStreams%3Flanguage%3Dfr%26protocol%3DHTTP%26mediaType%3Dmp4%26quality%3DEQ%2CSQ%2CHQ%26profileAmm%3D%24nin%3AAMM-YTFR-HAB%2CAMM-YTFR%2CAMM-DT%26kind%3DSHOW%26availableScreens%3Dtv%26fields%3DprogramId%2Curl%2Cquality%2CaudioSlot%2CaudioCode%2CaudioLabel%2CaudioShortLabel%2Cchannel%26programId%3D048353-033-A%26platform%3DARTEPLUS7&filename=PLUS7_stream_048353-033-A_fr_FR.json HTTP/1.1\" 200 5508 \"-\" \"Mozilla/5.0 (Linux; Tizen 2.3; SmartHub; SMART-TV; SmartTV; U; Maple2012) AppleWebKit/538.1+ (KHTML, like Gecko) TV Safari/538.1+ \" hbbtvdyn.arte.tv","@version":"1","@timestamp":"2016-02-25T10:19:53.000Z","path":"/data/logs/access","host":"arte-hbbtvdyn-web1.sdv.fr","type":"apache-access","application":"hbbtv","clientip":"212.95.70.119","ident":"-","auth":"-","timestamp":"25/Feb/2016:11:19:53 +0100","verb":"GET","request":"/OPA/getOPAData.php?url=videoStreams%3Flanguage%3Dfr%26protocol%3DHTTP%26mediaType%3Dmp4%26quality%3DEQ%2CSQ%2CHQ%26profileAmm%3D%24nin%3AAMM-YTFR-HAB%2CAMM-YTFR%2CAMM-DT%26kind%3DSHOW%26availableScreens%3Dtv%26fields%3DprogramId%2Curl%2Cquality%2CaudioSlot%2CaudioCode%2CaudioLabel%2CaudioShortLabel%2Cchannel%26programId%3D048353-033-A%26platform%3DARTEPLUS7&filename=PLUS7_stream_048353-033-A_fr_FR.json","httpversion":"1.1","response":"200","bytes":"5508","referrer":"\"-\"","agent":"\"Mozilla/5.0 (Linux; Tizen 2.3; SmartHub; SMART-TV; SmartTV; U; Maple2012) AppleWebKit/538.1+ (KHTML, like Gecko) TV Safari/538.1+ \"","targethost":"hbbtvdyn.arte.tv","geoip":{"ip":"212.95.70.119","country_code2":"FR","country_code3":"FRA","country_name":"France","continent_code":"EU","region_name":"C1","city_name":"Strasbourg","latitude":48.60040000000001,"longitude":7.787399999999991,"timezone":"Europe/Paris","real_region_name":"Alsace","location":[7.787399999999991,48.60040000000001]}}')

使用良好的速率消息(每秒几次)并且我绝对没有 grok 解析失败。

所以问题发生在 logstash 读取消息时。问题是

logstash的输入部分是

rabbitmq {
    host=>"arte-elasticlog.sdv.fr"
    user=>"***"
    password=>"***"
    queue=>"hbbtv"
    vhost=>"logstash"
    port=>5672
    auto_delete=>false
    durable=>true
    type => "rabbit_hbbtv"
  }

_grokparsefailure 表示无法解析消息。表示消息已成功从队列中读取,但您的 grok 表达式有问题或无法应用于您的消息内容。

还有一点是rabbitmq输入的默认编解码器是"json",如果你的rabbitmq消息内容不是json,你应该将输入的编解码器设置为例如:

编解码器 => 普通{}

问题出在我的 logstash 过滤器上, 我有两个 apache 访问权限,但模式不同,所以当 logstash 尝试解析消息时,有时他有很好的模式 --> 在 ES 中,有时没有 --> 没有消息。

现在,对于我所有不同的日志,我添加(添加字段)

application-->"my application name"

在我的输入中,我所有的 grok 过滤器都取决于应用程序。

现在一切都很好,感谢您的帮助。