RABBITMQ 发送消息到 LOGSTASH

RABBITMQ send messages to LOGSTASH

你能帮我在 logstash 中输入 RabbitMQ 吗? 我的应用程序将代码版本发送到 rabbitmq,然后它存储在弹性堆栈中。 对于 rabbitmq 中的应用程序已创建队列 姓名:app_version_queue; 类型:经典; 耐用:真

然后使用该配置配置 logstash:

input {
  rabbitmq {
    id => "rabbitmyq_id"
    # connect to rabbit
    host => "localhost"
    port => 5672
    vhost => "/"
    # INPUT - PRODUCERS
    key => "app_version_queue"
    # OUTPUT - CONSUMER
    # queue for logstash
    queue => "logstash"
    auto_delete => false
    # Exchange for logstash
    exchange => logstash
    exchange_type => direct
    durable => "true"
    # No ack will boost your perf
    ack => false
  }
}

output {
  elasticsearch {
    hosts => [ "elasticsearch:9200" ]
    index => "app_version-%{+YYYY.MM.dd}"
  }
}

它成功了,但现在,在 RabbitMQ 控制台中,我在 table 队列消息中看到 准备就绪:914,444 未处理:0 总计:914,444

而我在 rabbitmq 集群上的磁盘 space 在 3 天内就满了。 重新启动 rabbitmq 服务器后,所有 space 都是免费的。

更新: 所有的原因,为什么我这样做,我想从该链应用程序中删除 NIFI=>rabbit=>nifi=>elastic 我想做:app=>rabbit=>logstash=>elastic

  1. 队列:app_version 我的应用程序将消息发送到 nifi=>ELASTIC Queue1 - app_version_queue
  2. 队列:logstash,我用 logstash 创建的 Queue2 - logstash

我尝试停止NIFI发送,但是消息发不出去

听起来你已经创建了两次基础设施:

  • 在 RabbitMQ 中手动执行一次
  • 在 LogStash 的配置选项中一次

你需要的是三样东西:

  • 应用程序将消息发布到的交换。
  • LogStash 从中使用消息的队列。
  • 交换器和队列之间的绑定;队列将获得发布到交换器的每条消息的副本,并带有匹配的路由键。

你拥有的是这一切:

  • 一个名为 logs 的交换器(手动创建),您的应用程序将消息发布到该交换器。
  • 一个名为 app_version_queue 的队列(手动创建),没有任何消耗。
  • 绑定(手动创建)将消息副本从 logs 传送到 app_version_queue,然后永久保留在那里。
  • 一个名为 logstash 的交换器(由 LogStash 创建),没有向其发布消息。
  • 名为 logstash 的队列(由 LogStash 创建),LogStash 从中使用消息。
  • logstash 交换到 logstash 队列的绑定(由 LogStash 创建)不执行任何操作,因为没有消息发布到该交换。
  • logs 交换到 logstash 队列的绑定(手动创建),它实际上从您的应用程序传递消息。

因此,对于这三件事(交换、队列和绑定)中的每一个,您都需要:

  • 决定名字
  • 决定是您创建它,还是让 LogStash 创建它
  • 将所有内容配置为使用相同的名称

例如,您可以保留名称 logsapp_version_queue,然后手动创建所有内容。

那么您的 LogStash 应用程序将如下所示:

input {
  rabbitmq {
    id => "rabbitmyq_id"
    # connect to rabbit
    host => "localhost"
    port => 5672
    vhost => "/"
    # Consume from existing queue
    queue => "app_version_queue"
    # No ack will boost your perf
    ack => false
  }
}

另一方面,您可以只创建 logs 交换,让 LogStash 创建队列和绑定,如下所示:

input {
  rabbitmq {
    id => "rabbitmyq_id"
    # connect to rabbit
    host => "localhost"
    port => 5672
    vhost => "/"
    # Create a new queue
    queue => "logstash_processing_queue"
    durable => "true"
    # Take a copy of all messages with the "app_version_queue" routing key from the existing exchange
    exchange => "logs"
    key => "app_version_queue"
    # No ack will boost your perf
    ack => false
  }
}

或者您可以让 LogStash 创建它的 所有 ,并确保您的应用程序发布到正确的交换:

input {
  rabbitmq {
    id => "rabbitmyq_id"
    # connect to rabbit
    host => "localhost"
    port => 5672
    vhost => "/"
    # Create a new queue
    queue => "logstash_processing_queue"
    durable => "true"
    # Create a new exchange; point your application to publish here!
    exchange => "log_exchange"
    exchange_type => "direct"
    # Take a copy of all messages with the "app_version_queue" routing key from the new exchange
    key => "app_version_queue"
    # No ack will boost your perf
    ack => false
  }
}

我可能会选择中间选项:交换是应用程序部署要求的一部分(如果它不能在那里发布,它将产生错误),但是任何数量的队列都可能绑定到它以用于不同的原因(可能 none 在测试环境中根本不需要设置 ElasticSearch)。