如何检测logstash输入连接错误
How to detect logstash input connection error
kafka连接logstash时如何监控和检测错误
举个例子,我的 kafka broker 挂了,kafka 和 logstash 之间没有建立连接。
有没有什么办法可以让监视器知道logstash和kafka之间的连接状态?
我可以查询 logstash 日志(但我认为这不是合适的方式)并且我尝试使用 logstash 监控 API(例如 localhost:9600/_node/stats/pipelines?pretty)但是否 api 告诉我连接状态已关闭
提前致谢
如果您在 Kafka 节点上安装了 elastic agent
或 metricbeat agent
,您可以配置代理以使用其 Kafka 特定模块监视它们。
如您所述,要从 logstash 获取连接状态,您还可以配置 logstash 配置以从日志消息中获取状态。
elasticsearch 中的示例文档:
{
"_index": "topicname",
"_type": "_doc",
"_id": "ulF8uH0BK9MbBSR7DPEw",
"_version": 1,
"_score": null,
"fields": {
"@timestamp": [
"2022-05-09T10:27:56.956Z"
],
"@version": [
"1"
],
"@version.keyword": [
"1"
],
"message": [
"{\"requestMethod\":\"GET\",\"headers\":{\"content-type\":\"application/json\",\"user-agent\":\"PostmanRuntime/7.XX.XX\",\"accept\":\"*/*\",\"postman-token\":\"11224442345223\",\"host\":\"localhost:2300\",\"accept-encoding\":\"gzip, deflate, br\",\"connection\":\"keep-alive\",\"content-length\":\"44\"},\"body\":{\"category\":\"CAT\",\"noise\":\"purr\"},\"query\":{},\"requestUrl\":\"http://localhost:2300/kafka\",\"protocol\":\"HTTP/1.1\",\"remoteIp\":\"1\",\"requestSize\":302,\"userAgent\":\"PostmanRuntime/7.XX.X\",\"statusCode\":200,\"response\":{\"success\":true,\"message\":\"Kafka Details are added\",\"data\":{\"kafkaData\":{\"_id\":\"12gvsddwqbwrfteacr313rcet5\",\"category\":\"DOG\",\"noise\":\"bark\",\"__v\":0},\"postData\":{\"category\":\"DOG\",\"noise\":\"bark\"}}},\"latency\":{\"seconds\":0,\"nanos\":61000000},\"responseSize\":193}"
]
} }
可以添加以下配置来获取状态:
input {
kafka {
topics => ["topicname"]
bootstrap_servers => "11.11.11.11:1111"
}
}
filter{
mutate { add_field => { "StatusCode" => "%{[message][0][status]}" } }
}
output {
elasticsearch {
hosts => ["11.11.11.12:9200"]
index => "topic-name-index"
}
}
kafka连接logstash时如何监控和检测错误
举个例子,我的 kafka broker 挂了,kafka 和 logstash 之间没有建立连接。
有没有什么办法可以让监视器知道logstash和kafka之间的连接状态? 我可以查询 logstash 日志(但我认为这不是合适的方式)并且我尝试使用 logstash 监控 API(例如 localhost:9600/_node/stats/pipelines?pretty)但是否 api 告诉我连接状态已关闭
提前致谢
如果您在 Kafka 节点上安装了 elastic agent
或 metricbeat agent
,您可以配置代理以使用其 Kafka 特定模块监视它们。
如您所述,要从 logstash 获取连接状态,您还可以配置 logstash 配置以从日志消息中获取状态。
elasticsearch 中的示例文档:
{
"_index": "topicname",
"_type": "_doc",
"_id": "ulF8uH0BK9MbBSR7DPEw",
"_version": 1,
"_score": null,
"fields": {
"@timestamp": [
"2022-05-09T10:27:56.956Z"
],
"@version": [
"1"
],
"@version.keyword": [
"1"
],
"message": [
"{\"requestMethod\":\"GET\",\"headers\":{\"content-type\":\"application/json\",\"user-agent\":\"PostmanRuntime/7.XX.XX\",\"accept\":\"*/*\",\"postman-token\":\"11224442345223\",\"host\":\"localhost:2300\",\"accept-encoding\":\"gzip, deflate, br\",\"connection\":\"keep-alive\",\"content-length\":\"44\"},\"body\":{\"category\":\"CAT\",\"noise\":\"purr\"},\"query\":{},\"requestUrl\":\"http://localhost:2300/kafka\",\"protocol\":\"HTTP/1.1\",\"remoteIp\":\"1\",\"requestSize\":302,\"userAgent\":\"PostmanRuntime/7.XX.X\",\"statusCode\":200,\"response\":{\"success\":true,\"message\":\"Kafka Details are added\",\"data\":{\"kafkaData\":{\"_id\":\"12gvsddwqbwrfteacr313rcet5\",\"category\":\"DOG\",\"noise\":\"bark\",\"__v\":0},\"postData\":{\"category\":\"DOG\",\"noise\":\"bark\"}}},\"latency\":{\"seconds\":0,\"nanos\":61000000},\"responseSize\":193}"
]
} }
可以添加以下配置来获取状态:
input {
kafka {
topics => ["topicname"]
bootstrap_servers => "11.11.11.11:1111"
}
}
filter{
mutate { add_field => { "StatusCode" => "%{[message][0][status]}" } }
}
output {
elasticsearch {
hosts => ["11.11.11.12:9200"]
index => "topic-name-index"
}
}