gpt4 book ai didi

ElasticSearch 摄取管道 : create and update timestamp field

转载 作者:行者123 更新时间:2023-12-04 14:06:29 26 4
gpt4 key购买 nike

根据 this 在我的索引上创建时间戳字段回答,我创建了一个Ingest Pipeline运行特定索引:

PUT _ingest/pipeline/auto_now_add
{
"description": "Assigns the current date if not yet present and if the index name is whitelisted",
"processors": [
{
"script": {
"source": """
// skip if not whitelisted
if (![ "my_index_1",
"my_index_2"
].contains(ctx['_index'])) { return; }

// always update updated_at
ctx['updated_at'] = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());

"""
}
}
]
}

然后我将所有索引设置应用为默认管道

PUT _all/_settings
{
"index": {
"default_pipeline": "auto_now_add"
}
}

之后,我开始将我的对象索引到这些索引中。当我查询一个索引项时,我会得到那个在索引时更新了 updated_at 字段的项,例如:

{
_index: 'my_index_1',
_type: '_doc',
_id: 'r1285044056',
_version: 11,
_seq_no: 373,
_primary_term: 2,
found: true,
_source: {
updated_at: '2021-07-07 04:35:39',
...
}
}

我现在想要一个 created_at 字段,它只在第一次更新,所以我尝试以这种方式更新上面的脚本:

PUT _ingest/pipeline/auto_now_add
{
"description": "Assigns the current date if not yet present and if the index name is whitelisted",
"processors": [
{
"script": {
"source": """
// skip if not whitelisted
if (![ "my_index_1",
"my_index_2",
"..."
].contains(ctx['_index'])) { return; }

// always update updated_at
ctx['updated_at'] = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
// don't overwrite if present
if (ctx != null && ctx['created_at'] != null) { return; }

ctx['created_at'] = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
"""
}
}
]
}

但这个解决方案似乎不起作用:条件

if (ctx != null && ctx['created_at'] != null) { return; }

将始终失败,从而导致在索引上的每个对象更新时更新 created_at,与 updated_at 字段的方式相同,使其无用。那么,如何防止这种情况发生,并确保该字段 created_at 在 Ingestion Pipeline 创建后存在?

最佳答案

如@Val 在 this answer 中所述:

... the ingest pipeline processor(s) will only operate within the context of the document you're sending, not the one stored (if any).

因此,您将无权访问底层 _sourcedoc,因为摄取管道是为摄取而设计的 阶段,而不是更新 阶段。


当然,您可以保留 auto_now_add 管道以自动添加 updated_at,并且可以使用 created_at 扩展它(如果尚未存在)在摄取负载中)通过检查 ctx.containsKey — 因为 ctx 本质上是一个 java Map:

PUT _ingest/pipeline/auto_now_add
{
"description": "Assigns the current date if not yet present and if the index name is whitelisted",
"processors": [
{
"script": {
"source": """
// skip if not whitelisted
if (![ "my_index_1",
"my_index_2",
"..."
].contains(ctx['_index'])) { return; }

def now = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());

// guaranteee updated_at
ctx['updated_at'] = now;

// add created_at only if nonexistent in the payload
if (!ctx.containsKey('created_at')) {
ctx['created_at'] = now;
}
"""
}
}
]
}

但是,这仅在您第一次摄取文档时有效!

运行:

POST my_index_1/_doc/some_id
{
"some": "param"
}

将产生:

{
"some" : "param",
"updated_at" : "2021-07-08 10:35:13",
"created_at" : "2021-07-08 10:35:13"
}

现在,为了在每次更新文档时自动递增 updated_at您还需要一个脚本 — 这次存储在 _scripts,而不是 _ingest/pipeline:

PUT _scripts/incement_update_at__plus_new_params
{
"script": {
"lang": "painless",
"source": """
// add whatever is in the params
ctx._source.putAll(params);

// increment updated_at no matter what was in the params
ctx._source['updated_at'] = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
"""
}
}

然后,在运行 _update 调用时,通过提及上述 script 来执行此操作:

POST my_index_1/_doc/some_id/_update
{
"script": {
"id": "incement_update_at__plus_new_params",
"params": {
"your": "new params"
}
}
}

这将在不触及 created_at 的情况下增加 updated_at 并添加任何其他参数:

{
"some":"param",
"updated_at":"2021-07-08 10:49:44", <--
"created_at":"2021-07-08 10:39:55",
"your":"new params" <--
}

无耻外挂:我讨论pipelines & scripts在我的 Elasticsearch Handbook 中有详细介绍.

关于ElasticSearch 摄取管道 : create and update timestamp field,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68286853/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com