gpt4 book ai didi

elasticsearch - 通过在另一索引中找到的时间戳过滤/汇总时间序列数据的一个Elasticsearch索引

转载 作者:行者123 更新时间:2023-12-02 22:16:44 32 4
gpt4 key购买 nike

数据

因此,我有大量不同类型的时间序列数据。目前,我选择将每种类型的数据放入自己的索引中,因为除了4个字段外,所有数据都非常不同。而且,数据以不同的速率采样,并且不能保证在同一亚秒窗口中具有相同的时间戳,因此将它们全部融合到一个大文档中也不是一件容易的事。

目标

我试图查看是否可以完全在Elasticsearch中解决的一种常见用例之一是根据从另一个索引的查询返回的时间窗口返回一个索引的聚合结果。图示:

What i'd like to accomplish

这就是我要完成的。

一些注意事项

对于“条件”数据上足够小的信号转换,我可以仅使用日期直方图和顶部命中子聚合的某种组合,但是当我有10,000或100,000的“条件”出现时,这种情况很快就会消失”。此外,这只是一个“案例”,我想从中获得100套类似的情况,以从中获取总体的最小/最大值。

这些比较基本上是我认为是同级文档或索引之间的比较,因此似乎没有任何明显的父子关系可以长期保持足够的灵活性,至少与当前数据的方式有关结构化的。

感觉应该有一个优雅的解决方案,而不是用强力的方法在Elasticsearch之外用一个查询的结果构建日期范围,并将100的时间范围输入另一个查询。

仔细阅读文档,感觉就像是Elasticsearch脚本和某些流水线式聚合的结合,这是我想要的,但是没有确定的解决方案在我身边跳出来。我确实可以使用社区中正确方向的一些指导。

谢谢。

最佳答案

我找到了一个对我有用的“解决方案”。还没有任何答案,甚至没有任何人发表评论,但是我会发布我的解决方案,以防别人来找这样的事情。我确信会有很多改进和优化的机会,如果我发现了这样的解决方案(可能是通过脚本聚合),我会回来并更新我的解决方案。

它可能不是最佳解决方案,但对我有用。关键是利用top_hitsserial_diffbucket_selector聚合器。

解决方案”

def time_edges(index, must_terms=[], should_terms=[], filter_terms=[], data_sample_accuracy_window=200):
"""
Find the affected flights and date ranges where a specific set of terms occurs in a particular ES index.

index: the Elasticsearch index to search
terms: a list of dictionaries of form { "term": { "<termname>": <value>}}
"""
query = {
"size": 0,
"timeout": "5s",
"query": {
"constant_score": {
"filter": {
"bool": {
"must": must_terms,
"should": should_terms,
"filter": filter_terms
}
}
}
},
"aggs": {
"by_flight_id": {
"terms": {"field": "flight_id", "size": 1000},
"aggs": {
"last": {
"top_hits": {
"sort": [{"@timestamp": {"order": "desc"}}],
"size": 1,
"script_fields": {
"timestamp": {
"script": "doc['@timestamp'].value"
}
}
}
},
"first": {
"top_hits": {
"sort": [{"@timestamp": {"order": "asc"}}],
"size": 1,
"script_fields": {
"timestamp": {
"script": "doc['@timestamp'].value"
}
}
}
},
"time_edges": {
"histogram": {
"min_doc_count": 1,
"interval": 1,
"script": {
"inline": "doc['@timestamp'].value",
"lang": "painless",
}
},
"aggs": {
"timestamps": {
"max": {"field": "@timestamp"}
},
"timestamp_diff": {
"serial_diff": {
"buckets_path": "timestamps",
"lag": 1
}
},
"time_delta_filter": {
"bucket_selector": {
"buckets_path": {
"timestampDiff": "timestamp_diff"
},
"script": "if (params != null && params.timestampDiff != null) { params.timestampDiff > " + str(data_sample_accuracy_window) + "} else { false }"
}
}
}
}
}
}

}
}

return es.search(index=index, body=query)

分解事情

通过“索引2”过滤结果

the condition
    "query": {
"constant_score": {
"filter": {
"bool": {
"must": must_terms,
"should": should_terms,
"filter": filter_terms
}
}
}
},
must_terms是能够获取存储在“索引2”中的“条件”的所有结果的必需值。

例如,要将结果限制为仅过去10天,并且当 condition为值10或12时,我们添加以下 must_terms
must_terms = [
{
"range": {
"@timestamp": {
"gte": "now-10d",
"lte": "now"
}
}
},
{
"terms": {"condition": [10, 12]}
}
]

这将返回一组简化的文档,然后我们可以将其传递到聚合中以找出“样本”的位置。

集合体

对于我的用例,我们对飞机有“飞行”的概念,因此我想将返回的结果按其 id分组,然后将所有事件“分解”为铲斗。
    "aggs": {
"by_flight_id": {
"terms": {"field": "flight_id", "size": 1000},


...


}
}

}

您可以使用 top_hits聚合获取第一次出现的上升沿和最后一次出现的下降沿

top_hits
    "last": {
"top_hits": {
"sort": [{"@timestamp": {"order": "desc"}}],
"size": 1,
"script_fields": {
"timestamp": {
"script": "doc['@timestamp'].value"
}
}
}
},
"first": {
"top_hits": {
"sort": [{"@timestamp": {"order": "asc"}}],
"size": 1,
"script_fields": {
"timestamp": {
"script": "doc['@timestamp'].value"
}
}
}
},

您可以在时间戳上使用直方图来获取样本。这会将您返回的结果分成每个唯一时间戳记的存储桶。这是一个昂贵的聚合,但值得。使用内联脚本,我们可以将时间戳记值用作存储桶名称。

time_edges
    "time_edges": {
"histogram": {
"min_doc_count": 1,
"interval": 1,
"script": {
"inline": "doc['@timestamp'].value",
"lang": "painless",
}
},

...

}

默认情况下,直方图聚合返回一组存储桶,每个存储桶的文档计数,但是我们需要一个 value。这是 serial_diff聚合工作所必需的,因此我们必须对结果进行 token max聚合才能返回值。
    "aggs": {
"timestamps": {
"max": {"field": "@timestamp"}
},
"timestamp_diff": {
"serial_diff": {
"buckets_path": "timestamps",
"lag": 1
}
},

...

}

我们使用 serial_diff的结果来确定两个存储桶是否大致相邻。然后,我们丢弃彼此相邻的样本,并使用 bucket_selector聚合为我们的条件创建一个合并的时间范围。这将丢弃小于我们的 data_sample_accuracy_window的存储桶。此值取决于您的数据集。
    "aggs": {

...

"time_delta_filter": {
"bucket_selector": {
"buckets_path": {
"timestampDiff": "timestamp_diff"
},
"script": "if (params != null && params.timestampDiff != null) { params.timestampDiff > " + str(data_sample_accuracy_window) + "} else { false }"
}
}
}
serial_diff结果对于我们确定 condition设置多长时间也很关键。我们的存储桶的时间戳最终代表状态信号的“上升”沿,因此下降沿是未知的,无需进行一些后处理。我们使用timestampDiff值来确定下降沿在哪里。

关于elasticsearch - 通过在另一索引中找到的时间戳过滤/汇总时间序列数据的一个Elasticsearch索引,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40249803/

32 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com