gpt4 book ai didi

Elasticsearch 时间序列数据库日志记录示例和日期范围之间的求和

转载 作者:行者123 更新时间:2023-12-02 23:22:20 26 4
gpt4 key购买 nike

我需要编写一个应用程序来说明来自传感器的带宽,它在捕获的表中提供有关数据流的详细信息,如下所示:

[ElasticsearchType(Name = "trafficSnapshot")]
public class TrafficSnapshot
{
// use epoch_second @ https://mixmax.com/blog/30x-faster-elasticsearch-queries
[Date(Format = "epoch_second")]
public long TimeStamp { get; set; }

[Nested]
public Sample[] Samples { get; set; }
}

[ElasticsearchType(Name = "sample")]
public class Sample
{
public ulong Bytes { get; set; }
public ulong Packets { get; set; }
public string Source { get; set; }
public string Destination { get; set; }
}

可能会有很多日志条目,尤其是在每秒高流量的情况下,我相信我们可以通过 mm/dd/yyyy 分片/索引来控制增长。 (并通过删除旧索引丢弃不需要的天数)-但是,当我使用日期字符串创建索引时,我收到错误 Invalid NEST response built from a unsuccessful low level call on PUT: /15%2F12%2F2017 .如果我想拆分为日期,我应该如何定义索引?

如果我以这种格式记录数据,那么我是否可以按 IP 地址对发送的总数据和接收的总数据(在可以定义的日期范围内)执行求和,或者我最好存储/索引在我进一步发展之前,我的数据具有不同的结构?

我的完整代码在下面,今晚第一次刺伤,感谢指点(或者如果我偏离轨道并且使用logstash或类似的可能更好,请告诉我)。
public static class DateTimeEpochHelpers
{
public static DateTime FromUnixTime(this long unixTime)
{
var epoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);
return epoch.AddSeconds(unixTime);
}

public static long ToUnixTime(this DateTime date)
{
var epoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);
return Convert.ToInt64((date - epoch).TotalSeconds);
}
}

public static class ElasticClientTrafficSnapshotHelpers
{
public static void IndexSnapshot(this ElasticClient elasticClient, DateTime sampleTakenOn, Sample[] samples)
{
var timestamp = sampleTakenOn.ToUniversalTime();
var unixTime = timestamp.ToUnixTime();
var dateString = timestamp.Date.ToShortDateString();

// create the index if it doesn't exist
if (!elasticClient.IndexExists(dateString).Exists)
{
elasticClient.CreateIndex(dateString);
}

var response = elasticClient.Index(
new TrafficSnapshot
{
TimeStamp = unixTime,
Samples = samples
},
p => p
.Index(dateString)
.Id(unixTime)
);
}
}

class Program
{
static void Main(string[] args)
{
var node = new Uri("http://localhost:9200");

var settings = new ConnectionSettings(node);
var elasticClient = new ElasticClient(settings);

var timestamp = DateTime.UtcNow;

var samples = new[]
{
new Sample() {Bytes = 100, Packets = 1, Source = "193.100.100.5", Destination = "8.8.8.8"},
new Sample() {Bytes = 1022, Packets = 1, Source = "8.8.8.8", Destination = "193.100.100.5"},
new Sample() {Bytes = 66, Packets = 1, Source = "193.100.100.1", Destination = "91.100.100.1"},
new Sample() {Bytes = 554, Packets = 1, Source = "193.100.100.10", Destination = "91.100.100.2"},
new Sample() {Bytes = 89, Packets = 1, Source = "9.9.9.9", Destination = "193.100.100.20"},
};

elasticClient.IndexSnapshot(timestamp, samples);
}
}

最佳答案

// use epoch_second @ https://mixmax.com/blog/30x-faster-elasticsearch-queries
[Date(Format = "epoch_second")]
public long TimeStamp { get; set; }


我会评估这在较新版本的 Elasticsearch 中是否仍然适用。另外,对于您的用例,第二精度是否足够?您可以通过多种方式索引日期以满足不同的目的,例如用于排序、范围查询、精确值等。您可能还想使用 DateTimeDateTimeOffset类型,和 define a custom JsonConverter to serialize and deserialize to epoch_millis / epoch_second .

There will be potentially a lot of log entries especially at high traffic flows every second, I believe we can contain the growth by sharding/indexing by mm/dd/yyyy (and discard unneeded days by deleting old indexes)



对于时间序列数据,每个时间间隔创建索引是一个非常好的主意。通常,较新的数据,例如最后一天,上周,比旧数据更频繁地被搜索/聚合。通过索引到基于时间的索引,它允许您利用 hot/warm architectureshard allocation ,其中最新的索引可以存在于具有更好 IOP 的更强大的节点上,而较旧的索引可以存在于具有较低 IOP 的功能较弱的节点上。当您不再需要聚合此类数据时,您可以将这些索引快照到冷存储中。

when i create an index with a date string i get the error Invalid NEST response built from a unsuccessful low level call on PUT: /15%2F12%2F2017. How should i define the index if i want to split in to dates?



不要使用包含 / 的索引名称正如你所拥有的。您可能希望使用诸如 <year>-<month>-<day> 之类的格式。例如2017-12-16。您几乎肯定会想要利用 index templates确保为新创建的索引应用正确的映射,以及您可能需要考虑的几种方法:
  • 使用 Date Index Name processor根据文档中的时间戳字段将文档索引到正确的索引中
  • 使用 Rollover API来管理指数。有个好blog post on managing time-based indices efficiently .

  • If i log the data in this format, is it then possible for me to perform a summation per IP address for the total data send and total data received (over a date range which can be defined), or am i better off storing/indexing my data with a different structure before i progress further?



    是的。考虑将一组样本嵌套在一个文档上或将每个样本非规范化为一个文档是否有意义。查看模型,看起来样本在逻辑上可以是单独的文档,因为唯一共享的数据是时间戳。可以在顶级文档和嵌套文档上进行聚合,但使用顶级文档可能更容易表达一些查询。我建议尝试两种方法,看看哪种方法更适合您的用例。另外,看看 IP data type用于索引 IP 地址,还可以查看 ingest-geoip plugin for getting geo data from IP addresses .

    My full code is below and first stab tonight, pointers appreciated (or if i am going off track and may be better using logstash or similar please do let me know).



    有很多方法可以解决这个问题。如果您希望使用客户端来执行此操作,我建议使用批量 API 为每个请求索引多个文档,并在索引组件前面放置一个消息队列,以提供一层缓冲。 Logstash 在这里很有用,尤其是当您需要执行额外的丰富和过滤时。您可能还想查看 Curator for index management.

    关于Elasticsearch 时间序列数据库日志记录示例和日期范围之间的求和,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47840616/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com