Fluentd 高可用性设置和副本

Fluentd high-availability setup and duplicates

我有以下问题:

我们在高可用性设置中使用 fluentd:几 K 个转发器 -> 地理区域聚合器,最后 ES/S3 使用复制插件。 我们遇到了一个故障(日志几天都没有经过)并且自恢复以来,我们从流利的到我们的 ES 集群中获得了大量重复的记录(包括恢复后的重复数据)。 @type copy plugin 是否存在任何可能导致此类行为的已知问题?

我们的货代配置:

# TCP input
<source>
    @type forward
    port X
</source>

# Logs Forwarding
<match XXX>
    @type forward

    # forward to logs-aggregators
        <server> 
#...
        </server>

    # use tcp for heartbeat
    heartbeat_type tcp

    # use longer flush_interval to reduce CPU usage.
    # note that this is a trade-off against latency.
    flush_interval 10s

    # use file buffer to buffer events on disks.
    # max 4096 8MB chunks = 32GB of buffer space
    buffer_type file
    buffer_path /var/log/td-agent/buffer/forward
    buffer_chunk_limit 4m
    buffer_queue_limit 4096

    # use multi-threading to send buffered data in parallel
    num_threads 8

    # expire DNS cache (required for cloud environment such as EC2)
    expire_dns_cache 600
</match>

我们的聚合器配置:

# TCP input
<source>
    @type forward
    port X
</source>

# rsyslog
<source>
  @type syslog
  port X
  tag  rsyslog
</source>

# Logs storage
<match rsyslog.**>
  @type copy
  <store>
    @type elasticsearch
    hosts X
    logstash_format true
    logstash_prefix rsyslog
    logstash_dateformat %Y-%m-%d


    num_threads 8

    utc_index true
    reload_on_failure true
  </store>
</match>

# Bids storage
<match X>
  @type copy

  # push data to elasticsearch cluster
  <store>
    @type elasticsearch
    hosts X

    # save like logstash would
    logstash_format true
    logstash_prefix jita
    logstash_dateformat %Y-%m-%d

    # 64G of buffer data
    buffer_chunk_limit 16m
    buffer_queue_limit 4096
    flush_interval 5m

    num_threads 8

    # everything in UTC
    utc_index true

    #  quickly remove a dead node from the list of addresses
    reload_on_failure true
  </store>

  # additionally store data in s3 bucket
  <store>
    @type s3
    aws_key_id X
    aws_sec_key X
    s3_bucket X
    s3_region X
    s3_object_key_format %{path}/%{time_slice}_%{index}.%{file_extension}
    store_as gzip
    num_threads 8
    path logs
    buffer_path /var/log/td-agent/buffer/s3
    time_slice_format %Y-%m-%d/%H
    time_slice_wait 10m
    utc
  </store>
</match>

我知道这是一个旧话题,但我发布这个答案是为了防止有人到这里来寻找解决方案。

所有缓冲插件中都有一个名为“retry_wait”的配置项。此配置的默认值为 1 秒(1 秒)。

这意味着 fluentd 向 Elasticsearch 发送请求,如果它在 1 秒内没有收到响应,它将再次尝试发送请求。

从 Elasticsearch 方面来看,由于没有事务的概念,他们无法确定是否再次发送了相同的请求。因此,除了第一个请求中已经编制索引的内容之外,所有值都将再次尝试编制索引,从而导致重复。

在我们的例子中,大多数记录最终有大约 40 - 50 个重复项。

一个可能的解决方案可能是增加超时。 30 秒对我们来说通常有效,您可以使用该值并找出适合您的数字。

我们的解决方案是添加一个标识字段并使用 id_key 在 fluentd 配置(elasticsearch 插件)中定义它。从那时起,无论 td-agent 如何重试,我们都不再有问题:)