RABBITMQ 发送消息到 LOGSTASH
RABBITMQ send messages to LOGSTASH
你能帮我在 logstash 中输入 RabbitMQ 吗?
我的应用程序将代码版本发送到 rabbitmq,然后它存储在弹性堆栈中。
对于 rabbitmq 中的应用程序已创建队列
姓名:app_version_queue;
类型:经典;
耐用:真
然后使用该配置配置 logstash:
input {
rabbitmq {
id => "rabbitmyq_id"
# connect to rabbit
host => "localhost"
port => 5672
vhost => "/"
# INPUT - PRODUCERS
key => "app_version_queue"
# OUTPUT - CONSUMER
# queue for logstash
queue => "logstash"
auto_delete => false
# Exchange for logstash
exchange => logstash
exchange_type => direct
durable => "true"
# No ack will boost your perf
ack => false
}
}
output {
elasticsearch {
hosts => [ "elasticsearch:9200" ]
index => "app_version-%{+YYYY.MM.dd}"
}
}
它成功了,但现在,在 RabbitMQ 控制台中,我在 table 队列消息中看到
准备就绪:914,444
未处理:0
总计:914,444
而我在 rabbitmq 集群上的磁盘 space 在 3 天内就满了。
重新启动 rabbitmq 服务器后,所有 space 都是免费的。
更新:
所有的原因,为什么我这样做,我想从该链应用程序中删除 NIFI=>rabbit=>nifi=>elastic
我想做:app=>rabbit=>logstash=>elastic
- 队列:app_version
我的应用程序将消息发送到 nifi=>ELASTIC
Queue1 - app_version_queue
- 队列:logstash,我用 logstash 创建的
Queue2 - logstash
我尝试停止NIFI发送,但是消息发不出去
听起来你已经创建了两次基础设施:
- 在 RabbitMQ 中手动执行一次
- 在 LogStash 的配置选项中一次
你需要的是三样东西:
- 应用程序将消息发布到的交换。
- LogStash 从中使用消息的队列。
- 交换器和队列之间的绑定;队列将获得发布到交换器的每条消息的副本,并带有匹配的路由键。
你拥有的是这一切:
- 一个名为
logs
的交换器(手动创建),您的应用程序将消息发布到该交换器。
- 一个名为
app_version_queue
的队列(手动创建),没有任何消耗。
- 绑定(手动创建)将消息副本从
logs
传送到 app_version_queue
,然后永久保留在那里。
- 一个名为
logstash
的交换器(由 LogStash 创建),没有向其发布消息。
- 名为
logstash
的队列(由 LogStash 创建),LogStash 从中使用消息。
- 从
logstash
交换到 logstash
队列的绑定(由 LogStash 创建)不执行任何操作,因为没有消息发布到该交换。
- 从
logs
交换到 logstash
队列的绑定(手动创建),它实际上从您的应用程序传递消息。
因此,对于这三件事(交换、队列和绑定)中的每一个,您都需要:
- 决定名字
- 决定是您创建它,还是让 LogStash 创建它
- 将所有内容配置为使用相同的名称
例如,您可以保留名称 logs
和 app_version_queue
,然后手动创建所有内容。
那么您的 LogStash 应用程序将如下所示:
input {
rabbitmq {
id => "rabbitmyq_id"
# connect to rabbit
host => "localhost"
port => 5672
vhost => "/"
# Consume from existing queue
queue => "app_version_queue"
# No ack will boost your perf
ack => false
}
}
另一方面,您可以只创建 logs
交换,让 LogStash 创建队列和绑定,如下所示:
input {
rabbitmq {
id => "rabbitmyq_id"
# connect to rabbit
host => "localhost"
port => 5672
vhost => "/"
# Create a new queue
queue => "logstash_processing_queue"
durable => "true"
# Take a copy of all messages with the "app_version_queue" routing key from the existing exchange
exchange => "logs"
key => "app_version_queue"
# No ack will boost your perf
ack => false
}
}
或者您可以让 LogStash 创建它的 所有 ,并确保您的应用程序发布到正确的交换:
input {
rabbitmq {
id => "rabbitmyq_id"
# connect to rabbit
host => "localhost"
port => 5672
vhost => "/"
# Create a new queue
queue => "logstash_processing_queue"
durable => "true"
# Create a new exchange; point your application to publish here!
exchange => "log_exchange"
exchange_type => "direct"
# Take a copy of all messages with the "app_version_queue" routing key from the new exchange
key => "app_version_queue"
# No ack will boost your perf
ack => false
}
}
我可能会选择中间选项:交换是应用程序部署要求的一部分(如果它不能在那里发布,它将产生错误),但是任何数量的队列都可能绑定到它以用于不同的原因(可能 none 在测试环境中根本不需要设置 ElasticSearch)。
你能帮我在 logstash 中输入 RabbitMQ 吗? 我的应用程序将代码版本发送到 rabbitmq,然后它存储在弹性堆栈中。 对于 rabbitmq 中的应用程序已创建队列 姓名:app_version_queue; 类型:经典; 耐用:真
然后使用该配置配置 logstash:
input {
rabbitmq {
id => "rabbitmyq_id"
# connect to rabbit
host => "localhost"
port => 5672
vhost => "/"
# INPUT - PRODUCERS
key => "app_version_queue"
# OUTPUT - CONSUMER
# queue for logstash
queue => "logstash"
auto_delete => false
# Exchange for logstash
exchange => logstash
exchange_type => direct
durable => "true"
# No ack will boost your perf
ack => false
}
}
output {
elasticsearch {
hosts => [ "elasticsearch:9200" ]
index => "app_version-%{+YYYY.MM.dd}"
}
}
它成功了,但现在,在 RabbitMQ 控制台中,我在 table 队列消息中看到 准备就绪:914,444 未处理:0 总计:914,444
而我在 rabbitmq 集群上的磁盘 space 在 3 天内就满了。 重新启动 rabbitmq 服务器后,所有 space 都是免费的。
更新: 所有的原因,为什么我这样做,我想从该链应用程序中删除 NIFI=>rabbit=>nifi=>elastic 我想做:app=>rabbit=>logstash=>elastic
- 队列:app_version 我的应用程序将消息发送到 nifi=>ELASTIC Queue1 - app_version_queue
- 队列:logstash,我用 logstash 创建的 Queue2 - logstash
我尝试停止NIFI发送,但是消息发不出去
听起来你已经创建了两次基础设施:
- 在 RabbitMQ 中手动执行一次
- 在 LogStash 的配置选项中一次
你需要的是三样东西:
- 应用程序将消息发布到的交换。
- LogStash 从中使用消息的队列。
- 交换器和队列之间的绑定;队列将获得发布到交换器的每条消息的副本,并带有匹配的路由键。
你拥有的是这一切:
- 一个名为
logs
的交换器(手动创建),您的应用程序将消息发布到该交换器。 - 一个名为
app_version_queue
的队列(手动创建),没有任何消耗。 - 绑定(手动创建)将消息副本从
logs
传送到app_version_queue
,然后永久保留在那里。 - 一个名为
logstash
的交换器(由 LogStash 创建),没有向其发布消息。 - 名为
logstash
的队列(由 LogStash 创建),LogStash 从中使用消息。 - 从
logstash
交换到logstash
队列的绑定(由 LogStash 创建)不执行任何操作,因为没有消息发布到该交换。 - 从
logs
交换到logstash
队列的绑定(手动创建),它实际上从您的应用程序传递消息。
因此,对于这三件事(交换、队列和绑定)中的每一个,您都需要:
- 决定名字
- 决定是您创建它,还是让 LogStash 创建它
- 将所有内容配置为使用相同的名称
例如,您可以保留名称 logs
和 app_version_queue
,然后手动创建所有内容。
那么您的 LogStash 应用程序将如下所示:
input {
rabbitmq {
id => "rabbitmyq_id"
# connect to rabbit
host => "localhost"
port => 5672
vhost => "/"
# Consume from existing queue
queue => "app_version_queue"
# No ack will boost your perf
ack => false
}
}
另一方面,您可以只创建 logs
交换,让 LogStash 创建队列和绑定,如下所示:
input {
rabbitmq {
id => "rabbitmyq_id"
# connect to rabbit
host => "localhost"
port => 5672
vhost => "/"
# Create a new queue
queue => "logstash_processing_queue"
durable => "true"
# Take a copy of all messages with the "app_version_queue" routing key from the existing exchange
exchange => "logs"
key => "app_version_queue"
# No ack will boost your perf
ack => false
}
}
或者您可以让 LogStash 创建它的 所有 ,并确保您的应用程序发布到正确的交换:
input {
rabbitmq {
id => "rabbitmyq_id"
# connect to rabbit
host => "localhost"
port => 5672
vhost => "/"
# Create a new queue
queue => "logstash_processing_queue"
durable => "true"
# Create a new exchange; point your application to publish here!
exchange => "log_exchange"
exchange_type => "direct"
# Take a copy of all messages with the "app_version_queue" routing key from the new exchange
key => "app_version_queue"
# No ack will boost your perf
ack => false
}
}
我可能会选择中间选项:交换是应用程序部署要求的一部分(如果它不能在那里发布,它将产生错误),但是任何数量的队列都可能绑定到它以用于不同的原因(可能 none 在测试环境中根本不需要设置 ElasticSearch)。