gpt4 book ai didi

Elasticsearch 基于聚合结果的部分更新

转载 作者:行者123 更新时间:2023-12-04 17:35:41 24 4
gpt4 key购买 nike

我想部分更新所有基于聚合结果的对象。

这是我的对象:

{
"name": "name",
"identificationHash": "aslkdakldjka",
"isDupe": false,
...
}

我的目标是将“identificationHash”出现超过 2 次的所有文档的 isDupe 设置为“true”。

目前我正在做的是:

  1. 我得到了所有“isDupe”= false 的文档,并在“identificationHash”上进行了 Term 聚合,min_doc_count 为 2。
{
"query": {
"bool": {
"must": [
{
"term": {
"isDupe": {
"value": false,
"boost": 1
}
}
}
]
}
},
"aggregations": {
"identificationHashCount": {
"terms": {
"field": "identificationHash",
"size": 10000,
"min_doc_count": 2
}
}
}
}
  1. 对于聚合结果,我使用一个脚本进行批量更新,其中“ctx._source.isDupe=true”用于匹配我的聚合结果的所有 identificationHash。

我重复步骤 1 和 2,直到聚合查询不再有结果。

我的问题是:是否有更好的解决方案来解决这个问题?我可以用一个脚本查询做同样的事情而不用循环处理一批 1000 个标识散列吗?

最佳答案

据我所知,没有任何解决方案可以让您在拍摄时做到这一点。但是,有一种方法可以分两步完成,而无需迭代几批哈希。

想法是首先使用称为 Transforms 的功能识别所有要更新的哈希值,这不过是一种利用聚合并根据聚合结果构建新索引的功能。

一旦您的转换创建了新索引,您就可以将其用作 terms lookup mechanism通过查询运行更新并为所有具有匹配哈希的文档更新 isDupe bool 值。

因此,首先,我们要创建一个转换,该转换将创建一个新索引,其中包含包含所有需要更新的重复哈希的文档。这是使用 scripted_metric aggregation 实现的其工作是识别所有至少出现两次且 isDupe: false 的哈希值。我们还按周汇总,因此对于每一周,都会有一个文档包含该周的所有重复哈希值。

PUT _transform/dup-transform
{
"source": {
"index": "test-index",
"query": {
"term": {
"isDupe": "false"
}
}
},
"dest": {
"index": "test-dups",
"pipeline": "set-id"
},
"pivot": {
"group_by": {
"week": {
"date_histogram": {
"field": "lastModifiedDate",
"calendar_interval": "week"
}
}
},
"aggregations": {
"dups": {
"scripted_metric": {
"init_script": """
state.week = -1;
state.hashes = [:];
""",
"map_script": """
// gather all hashes from each shard and count them
def hash = doc['identificationHash.keyword'].value;

// set week
state.week = doc['lastModifiedDate'].value.get(IsoFields.WEEK_OF_WEEK_BASED_YEAR).toString();

// initialize hashes
if (!state.hashes.containsKey(hash)) {
state.hashes[hash] = 0;
}
// increment hash
state.hashes[hash] += 1;
""",
"combine_script": "return state",
"reduce_script": """
def hashes = [:];
def week = -1;
// group the hash counts from each shard and add them up
for (state in states) {
if (state == null) return null;
week = state.week;
for (hash in state.hashes.keySet()) {
if (!hashes.containsKey(hash)) {
hashes[hash] = 0;
}
hashes[hash] += state.hashes[hash];
}
}

// only return the hashes occurring at least twice
return [
'week': week,
'hashes': hashes.keySet().stream().filter(hash -> hashes[hash] >= 2)
.collect(Collectors.toList())
]
"""
}
}
}
}
}

在运行转换之前,我们需要创建定义目标文档 ID 的 set-id 管道(在转换的 dest 部分引用)这将包含哈希值,以便我们可以在更新文档的 terms 查询中引用它:

PUT _ingest/pipeline/set-id
{
"processors": [
{
"set": {
"field": "_id",
"value": "{{dups.week}}"
}
}
]
}

我们现在准备好 start the transform生成要更新的哈希列表,就像运行这个一样简单:

POST _transform/dup-transform/_start

运行后,目标索引 test-dups 将包含一个如下所示的文档:

  {
"_index" : "test-dups",
"_type" : "_doc",
"_id" : "44",
"_score" : 1.0,
"_source" : {
"week" : "2021-11-01T00:00:00.000Z",
"dups" : {
"week" : "44",
"hashes" : [
"12345"
]
}
}
},

最后,我们可以按如下方式通过查询运行更新(在目标索引中添加与每周文档一样多的 terms 查询):

POST test/_update_by_query
{
"query": {
"bool": {
"minimum_should_match": 1,
"should": [
{
"terms": {
"identificationHash": {
"index": "test-dups",
"id": "44",
"path": "dups.hashes"
}
}
},
{
"terms": {
"identificationHash": {
"index": "test-dups",
"id": "45",
"path": "dups.hashes"
}
}
}
]
}
},
"script": {
"source": "ctx._source.isDupe = true;"
}
}

只需两个简单的步骤即可!试试看,然后告诉我。

关于Elasticsearch 基于聚合结果的部分更新,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56761648/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com