Elasticsearch 时间序列数据库日志样本和日期范围之间的求和

Elasticsearch time series database logging samples and summing between date range

我需要编写一个应用程序来说明来自传感器的带宽,它在 table 中提供了有关数据流的详细信息,如下所示:

[ElasticsearchType(Name = "trafficSnapshot")]
public class TrafficSnapshot
{
    // use epoch_second @ https://mixmax.com/blog/30x-faster-elasticsearch-queries
    [Date(Format = "epoch_second")]
    public long TimeStamp { get; set; }

    [Nested]
    public Sample[] Samples { get; set; }
}

[ElasticsearchType(Name = "sample")]
public class Sample
{
    public ulong Bytes { get; set; }
    public ulong Packets { get; set; }
    public string Source { get; set; }
    public string Destination { get; set; }
}

可能会有很多日志条目,尤其是每秒的高流量时,我相信我们可以通过 mm/dd/yyyy 控制 sharding/indexing 的增长(并通过删除旧索引丢弃不需要的天数) - 但是,当我使用日期字符串创建索引时,出现错误 Invalid NEST response built from a unsuccessful low level call on PUT: /15%2F12%2F2017。如果要拆分成日期,应该如何定义索引?

如果我以这种格式记录数据,那么我是否可以针对发送的总数据和接收的总数据(在可以定义的日期范围内)对每个 IP 地址执行总和,或者我是在我进一步取得进展之前,storing/indexing 使用不同结构的数据会更好吗?

我的完整代码在下面,今晚第一次尝试,感谢指点(或者如果我偏离轨道并且使用 logstash 或类似工具可能会更好,请告诉我)。

public static class DateTimeEpochHelpers
{
    public static DateTime FromUnixTime(this long unixTime)
    {
        var epoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);
        return epoch.AddSeconds(unixTime);
    }

    public static long ToUnixTime(this DateTime date)
    {
        var epoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);
        return Convert.ToInt64((date - epoch).TotalSeconds);
    }
}

public static class ElasticClientTrafficSnapshotHelpers
{
    public static void IndexSnapshot(this ElasticClient elasticClient, DateTime sampleTakenOn, Sample[] samples)
    {
        var timestamp = sampleTakenOn.ToUniversalTime();
        var unixTime = timestamp.ToUnixTime();
        var dateString = timestamp.Date.ToShortDateString();

        // create the index if it doesn't exist
        if (!elasticClient.IndexExists(dateString).Exists)
        {
            elasticClient.CreateIndex(dateString);
        }

        var response = elasticClient.Index(
            new TrafficSnapshot
            {
                TimeStamp = unixTime,
                Samples = samples
            },
            p => p
                .Index(dateString)
                .Id(unixTime)
        );
    }
}

class Program
{
    static void Main(string[] args)
    {
        var node = new Uri("http://localhost:9200");

        var settings = new ConnectionSettings(node);              
        var elasticClient = new ElasticClient(settings);

        var timestamp = DateTime.UtcNow;

        var samples = new[]
        {
            new Sample() {Bytes = 100, Packets = 1, Source = "193.100.100.5", Destination = "8.8.8.8"},
            new Sample() {Bytes = 1022, Packets = 1, Source = "8.8.8.8", Destination = "193.100.100.5"},
            new Sample() {Bytes = 66, Packets = 1, Source = "193.100.100.1", Destination = "91.100.100.1"},
            new Sample() {Bytes = 554, Packets = 1, Source = "193.100.100.10", Destination = "91.100.100.2"},
            new Sample() {Bytes = 89, Packets = 1, Source = "9.9.9.9", Destination = "193.100.100.20"},
        };

        elasticClient.IndexSnapshot(timestamp, samples);
    }
}
// use epoch_second @ https://mixmax.com/blog/30x-faster-elasticsearch-queries
[Date(Format = "epoch_second")]
public long TimeStamp { get; set; }

我会评估这在新版本的 Elasticsearch 中是否仍然适用。另外,第二个精度是否足以满足您的用例?您可以通过多种方式为日期编制索引以满足不同的目的,例如用于排序、范围查询、精确值等。您可能还想使用 DateTimeDateTimeOffset 类型,以及 define a custom JsonConverter to serialize and deserialize to epoch_millis/epoch_second.

There will be potentially a lot of log entries especially at high traffic flows every second, I believe we can contain the growth by sharding/indexing by mm/dd/yyyy (and discard unneeded days by deleting old indexes)

按时间间隔创建索引对于时间序列数据来说是一个非常好的主意。通常,较新的数据例如最后一天,上周,searched/aggregated 比旧数据更频繁。通过索引到基于时间的索引,它允许您利用 hot/warm architecture with shard allocation,因此最新的索引可以存在于具有更好 IOP 的更强大的节点上,而较旧的索引可以存在于具有更少 IOP 的功能较弱的节点上。当您不再需要聚合此类数据时,您可以将这些索引快照到冷存储中。

when i create an index with a date string i get the error Invalid NEST response built from a unsuccessful low level call on PUT: /15%2F12%2F2017. How should i define the index if i want to split in to dates?

不要使用包含 / 的索引名称。您可能希望使用 <year>-<month>-<day> 之类的格式,例如2017-12-16。您几乎肯定会希望利用 index templates 来确保为新创建的索引应用正确的映射,并且您可能需要考虑以下几种方法:

If i log the data in this format, is it then possible for me to perform a summation per IP address for the total data send and total data received (over a date range which can be defined), or am i better off storing/indexing my data with a different structure before i progress further?

是的。考虑将一组样本嵌套在一个文档中,或者将每个样本反规范化为一个文档是否有意义。查看模型,看起来样本在逻辑上可以是单独的文档,因为唯一的共享数据是时间戳。可以在顶级文档和嵌套文档上进行聚合,但使用顶级文档可能更容易表达一些查询。我建议尝试两种方法,看看哪种更适合您的用例。另外,看看 IP data type for indexing IP addresses, and also check out the ingest-geoip plugin for getting geo data from IP addresses.

My full code is below and first stab tonight, pointers appreciated (or if i am going off track and may be better using logstash or similar please do let me know).

有很多方法可以解决这个问题。如果您希望使用客户端执行此操作,我建议使用批量 API 为每个请求索引多个文档,并在索引组件前面放置一个消息队列,以提供一层缓冲。 Logstash 在这里很有用,尤其是当您需要执行额外的充实和过滤时。您可能还想看看 Curator for index management.