gpt4 book ai didi

elasticsearch - 使用 Elasticsearch 对连续文档进行分组

转载 作者:行者123 更新时间:2023-12-02 22:40:39 25 4
gpt4 key购买 nike

有没有办法让 Elasticsearch 在分组时考虑序列间隙?

前提是将以下数据批量导入到Elasticsearch:

{ "index": { "_index": "test", "_type": "groupingTest", "_id": "1" } }
{ "sequence": 1, "type": "A" }
{ "index": { "_index": "test", "_type": "groupingTest", "_id": "2" } }
{ "sequence": 2, "type": "A" }
{ "index": { "_index": "test", "_type": "groupingTest", "_id": "3" } }
{ "sequence": 3, "type": "B" }
{ "index": { "_index": "test", "_type": "groupingTest", "_id": "4" } }
{ "sequence": 4, "type": "A" }
{ "index": { "_index": "test", "_type": "groupingTest", "_id": "5" } }
{ "sequence": 5, "type": "A" }

有没有办法以这样的方式查询这些数据

  • 序列号为1和2的文档进入一个输出组,
  • 序号为3的文档转到另一个,
  • 序列号为 4 和 5 的文档是否属于第三组?

...考虑到 A 类序列被 B 类项目(或任何其他非 A 类项目)打断的事实?

我希望结果桶看起来像这样(sequence_group 的名称和值可能不同 - 只是试图说明逻辑):

"buckets": [
{
"key": "a",
"sequence_group": 1,
"doc_count": 2
},
{
"key": "b",
"sequence_group": 3,
"doc_count": 1
},
{
"key": "a",
"sequence_group": 4,
"doc_count": 2
}
]

https://www.simple-talk.com/sql/t-sql-programming/the-sql-of-gaps-and-islands-in-sequences/ 上对问题和一些 SQL 解决方法有很好的描述。 .我想知道是否也有适用于 elasticsearch 的解决方案。

最佳答案

我们可以在这里使用以 map-reduce 方式工作的Scripted Metric Aggregation(引用 link)。它有不同的部分,如 init、map、combine 和 reduce。而且,好处是所有这些的结果也可以是列表或 map 。

我在这方面做了一些尝试。

使用的 ElasticSearch 版本:7.1

创建索引:

PUT test
{
"mappings": {
"properties": {
"sequence": {
"type": "long"
},
"type": {
"type": "text",
"fielddata": true
}
}
}
}

批量索引:(请注意,我删除了映射类型“groupingTest”)

POST _bulk
{ "index": { "_index": "test", "_id": "1" } }
{ "sequence": 1, "type": "A" }
{ "index": { "_index": "test", "_id": "2" } }
{ "sequence": 2, "type": "A" }
{ "index": { "_index": "test", "_id": "3" } }
{ "sequence": 3, "type": "B" }
{ "index": { "_index": "test", "_id": "4" } }
{ "sequence": 4, "type": "A" }
{ "index": { "_index": "test", "_id": "5" } }
{ "sequence": 5, "type": "A" }

查询

GET test/_doc/_search
{
"size": 0,
"aggs": {
"scripted_agg": {
"scripted_metric": {
"init_script": """
state.seqTypeArr = [];
""",
"map_script": """
def seqType = doc.sequence.value + '_' + doc['type'].value;
state.seqTypeArr.add(seqType);
""",
"combine_script": """
def list = [];
for(seqType in state.seqTypeArr) {
list.add(seqType);
}
return list;
""",
"reduce_script": """
def fullList = [];
for(agg_value in states) {
for(x in agg_value) {
fullList.add(x);
}
}
fullList.sort((a,b) -> a.compareTo(b));
def result = [];
def item = new HashMap();
for(int i=0; i<fullList.size(); i++) {
def str = fullList.get(i);
def index = str.indexOf("_");
def ch = str.substring(index+1);
def val = str.substring(0, index);
if(item["key"] == null) {
item["key"] = ch;
item["sequence_group"] = val;
item["doc_count"] = 1;
} else if(item["key"] == ch) {
item["doc_count"] = item["doc_count"] + 1;
} else {
result.add(item);
item = new HashMap();
item["key"] = ch;
item["sequence_group"] = val;
item["doc_count"] = 1;
}
}
result.add(item);
return result;
"""
}
}
}
}

最后,输出:

{
"took" : 21,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 5,
"relation" : "eq"
},
"max_score" : null,
"hits" : [ ]
},
"aggregations" : {
"scripted_agg" : {
"value" : [
{
"doc_count" : 2,
"sequence_group" : "1",
"key" : "a"
},
{
"doc_count" : 1,
"sequence_group" : "3",
"key" : "b"
},
{
"doc_count" : 2,
"sequence_group" : "4",
"key" : "a"
}
]
}
}
}

请注意,脚本聚合对查询性能有很大影响。因此,如果没有大量文档,您可能会注意到速度有些慢。

关于elasticsearch - 使用 Elasticsearch 对连续文档进行分组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32124110/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com