Logsash 管道在聚合过滤器插件的超时持续时间之前终止

Logsash pipeline terminating before timeout duration of aggregation filter plugin

我有一个 logstash 管道,它有一个用于输入的 elasticsearch 插件和聚合过滤器插件。聚合过滤器插件的超时持续时间为 15 秒。参考过滤插件配置

aggregate {
        task_id => "%{[test.region.keyword]}"
        code => "
                map['total_count_per_region'] ||= 0; 
                map['total_count_per_region'] += event.get('Count');
                "
        push_map_as_event_on_timeout => true
        timeout_task_id_field => "test.region.keyword"
        timeout => 15 
        timeout_tags => ['_aggregatetimeout']
        timeout_code => "
                        event.set('total_count_per_region', event.get('total_count_per_region'));
                        "
    }

我的管道配置从索引中获取文档作为输入,过滤器插件使用聚合插件执行聚合,如上所示。这将在 15 秒后生成聚合结果作为事件。

但是当我启动我的管道时,它成功启动,没有任何问题。但是,在生成聚合事件之前(15 秒后)管道终止,因此我的输出中没有这些事件。

[2021-03-23T14:12:24,610][INFO ][logstash.javapipeline    ][pTest] Starting pipeline {:pipeline_id=>"pTest", "pipeline.workers"=>3, "pipeline.batch.size"=>1500, "pipeline.batch.delay"=>600, "pipeline.max_inflight"=>4500, "pipeline.sources"=>["/etc/logstash/conf.d/test1.conf"], :thread=>"#<Thread:0x7eabfdb5 run>"}
 [2021-03-23T14:12:24,689][INFO ][logstash.agent           ] Pipelines running {:count=>2, :running_pipelines=>[:".monitoring-logstash", :pTest], :non_running_pipelines=>[  ]}
 [2021-03-23T14:12:28,601][INFO ][logstash.javapipeline    ][pTest] Pipeline terminated {"pipeline.id"=>"pTest"}

请帮忙解决这个问题

假设您是 运行 7.7 或 7.8,您可以升级到 7.9.1,或禁用 [​​=18=] 执行引擎。这可以在 logstash.yml 中使用 pipeline.java_execution: false 或在命令行上使用 --java-execution false.

完成

聚合超时由每 5 秒运行一次的“周期性刷新程序”触发。当管道关闭时,“最终刷新器”会提前触发超时。引入了 bug 以防止 java 引擎调用任何刷新程序。这已在 7.9.1 中修复。

我找到了我面临的问题的原因。我使用的是没有 schedule 属性的 elasticsearch 输入插件,因此,当我 started/restarted logstash 时,通过管道 运行 一次,然后在 运行 之后终止.我必须使用 schedule 属性将其安排为每天 运行 一次(这也是我的要求),并确保我的管道处于 运行ning 状态。

这解决了我发布的问题。每天在我的管道 运行 后 15 秒生成聚合事件时,我的管道仍处于 运行ning 状态并且事件成功通过输出。

谢谢