Logsash 管道在聚合过滤器插件的超时持续时间之前终止
Logsash pipeline terminating before timeout duration of aggregation filter plugin
我有一个 logstash 管道,它有一个用于输入的 elasticsearch 插件和聚合过滤器插件。聚合过滤器插件的超时持续时间为 15 秒。参考过滤插件配置
aggregate {
task_id => "%{[test.region.keyword]}"
code => "
map['total_count_per_region'] ||= 0;
map['total_count_per_region'] += event.get('Count');
"
push_map_as_event_on_timeout => true
timeout_task_id_field => "test.region.keyword"
timeout => 15
timeout_tags => ['_aggregatetimeout']
timeout_code => "
event.set('total_count_per_region', event.get('total_count_per_region'));
"
}
我的管道配置从索引中获取文档作为输入,过滤器插件使用聚合插件执行聚合,如上所示。这将在 15 秒后生成聚合结果作为事件。
但是当我启动我的管道时,它成功启动,没有任何问题。但是,在生成聚合事件之前(15 秒后)管道终止,因此我的输出中没有这些事件。
[2021-03-23T14:12:24,610][INFO ][logstash.javapipeline ][pTest] Starting pipeline {:pipeline_id=>"pTest", "pipeline.workers"=>3, "pipeline.batch.size"=>1500, "pipeline.batch.delay"=>600, "pipeline.max_inflight"=>4500, "pipeline.sources"=>["/etc/logstash/conf.d/test1.conf"], :thread=>"#<Thread:0x7eabfdb5 run>"}
[2021-03-23T14:12:24,689][INFO ][logstash.agent ] Pipelines running {:count=>2, :running_pipelines=>[:".monitoring-logstash", :pTest], :non_running_pipelines=>[ ]}
[2021-03-23T14:12:28,601][INFO ][logstash.javapipeline ][pTest] Pipeline terminated {"pipeline.id"=>"pTest"}
请帮忙解决这个问题
假设您是 运行 7.7 或 7.8,您可以升级到 7.9.1,或禁用 [=18=] 执行引擎。这可以在 logstash.yml 中使用 pipeline.java_execution: false
或在命令行上使用 --java-execution false
.
完成
聚合超时由每 5 秒运行一次的“周期性刷新程序”触发。当管道关闭时,“最终刷新器”会提前触发超时。引入了 bug 以防止 java 引擎调用任何刷新程序。这已在 7.9.1 中修复。
我找到了我面临的问题的原因。我使用的是没有 schedule 属性的 elasticsearch 输入插件,因此,当我 started/restarted logstash 时,通过管道 运行 一次,然后在 运行 之后终止.我必须使用 schedule 属性将其安排为每天 运行 一次(这也是我的要求),并确保我的管道处于 运行ning 状态。
这解决了我发布的问题。每天在我的管道 运行 后 15 秒生成聚合事件时,我的管道仍处于 运行ning 状态并且事件成功通过输出。
谢谢
我有一个 logstash 管道,它有一个用于输入的 elasticsearch 插件和聚合过滤器插件。聚合过滤器插件的超时持续时间为 15 秒。参考过滤插件配置
aggregate {
task_id => "%{[test.region.keyword]}"
code => "
map['total_count_per_region'] ||= 0;
map['total_count_per_region'] += event.get('Count');
"
push_map_as_event_on_timeout => true
timeout_task_id_field => "test.region.keyword"
timeout => 15
timeout_tags => ['_aggregatetimeout']
timeout_code => "
event.set('total_count_per_region', event.get('total_count_per_region'));
"
}
我的管道配置从索引中获取文档作为输入,过滤器插件使用聚合插件执行聚合,如上所示。这将在 15 秒后生成聚合结果作为事件。
但是当我启动我的管道时,它成功启动,没有任何问题。但是,在生成聚合事件之前(15 秒后)管道终止,因此我的输出中没有这些事件。
[2021-03-23T14:12:24,610][INFO ][logstash.javapipeline ][pTest] Starting pipeline {:pipeline_id=>"pTest", "pipeline.workers"=>3, "pipeline.batch.size"=>1500, "pipeline.batch.delay"=>600, "pipeline.max_inflight"=>4500, "pipeline.sources"=>["/etc/logstash/conf.d/test1.conf"], :thread=>"#<Thread:0x7eabfdb5 run>"}
[2021-03-23T14:12:24,689][INFO ][logstash.agent ] Pipelines running {:count=>2, :running_pipelines=>[:".monitoring-logstash", :pTest], :non_running_pipelines=>[ ]}
[2021-03-23T14:12:28,601][INFO ][logstash.javapipeline ][pTest] Pipeline terminated {"pipeline.id"=>"pTest"}
请帮忙解决这个问题
假设您是 运行 7.7 或 7.8,您可以升级到 7.9.1,或禁用 [=18=] 执行引擎。这可以在 logstash.yml 中使用 pipeline.java_execution: false
或在命令行上使用 --java-execution false
.
聚合超时由每 5 秒运行一次的“周期性刷新程序”触发。当管道关闭时,“最终刷新器”会提前触发超时。引入了 bug 以防止 java 引擎调用任何刷新程序。这已在 7.9.1 中修复。
我找到了我面临的问题的原因。我使用的是没有 schedule 属性的 elasticsearch 输入插件,因此,当我 started/restarted logstash 时,通过管道 运行 一次,然后在 运行 之后终止.我必须使用 schedule 属性将其安排为每天 运行 一次(这也是我的要求),并确保我的管道处于 运行ning 状态。
这解决了我发布的问题。每天在我的管道 运行 后 15 秒生成聚合事件时,我的管道仍处于 运行ning 状态并且事件成功通过输出。
谢谢