gpt4 book ai didi

elasticsearch - (logstash) 仅索引来自 kafka 输入的 elasticsearch 中的特定数据

转载 作者:行者123 更新时间:2023-12-03 01:50:55 24 4
gpt4 key购买 nike

我使用 kafka 作为输入并将其放入 elasticsearch(输出)

input {
kafka {
topics =>["maxwell"]
codec => json
}
}
filter {
}
output {
stdout { codec => rubydebug }
elasticsearch {
index => 'test_kafka'
document_type => "%{table}"
hosts => 'localhost:9200'
}
}

当它运行时,它会输出以下 json
{
"database": "my_db",
"xid": 88935,
"@timestamp": "2016-11-14T12:00:13.763Z",
"data": {
"contact_country_code": null,
"contact_type_id": 1,
"created": "2014-10-03 12:24:36",
"modified_by": null,
"modified": "2014-10-03 12:24:36",
"contact_id": 1,
"is_default": 0,
"created_by": null,
"contact_number": "1241222232"
},
"old": {
"contact_number": "1241222"
},
"commit": true,
"@version": "1",
"type": "update",
"table": "contact",
"ts": 1479124813
}

我的问题是,我怎样才能只在elasticsearch中使用动态document_type提取数据键来实现这一点
{
"_index": "test_kafka",
"_type": "contact",
"_id": "AVhitY804rvpX8qdVt9d",
"_score": 1,
"_source": {
"contact_country_code": null,
"contact_type_id": 1,
"created": "2014-10-03 12:24:36",
"modified_by": null,
"modified": "2014-10-03 12:24:36",
"contact_id": 1,
"is_default": 0,
"created_by": null,
"contact_number": "1241222232"
}
}

最佳答案

您可以添加 ruby过滤器来按摩您的事件,如下所示。它所做的是首先保存 table @metadata 中的字段字段,以便您可以在 elasticsearch 中引用它输出。然后它会删除除 data 之外的所有字段。一。然后它复制 data 中的所有字段。根级别的字段,最后删除 data field 。

input {
kafka {
topics =>["maxwell"]
codec => json
}
}
filter {
mutate {
add_field => { "[@metadata][type]" => "%{table}" }
}
ruby {
code => "
# Ruby code for Logstash 2.x
event.to_hash.delete_if {|k, v| k != 'data'}
event.to_hash.update(event['data'].to_hash)
event.to_hash.delete_if {|k, v| k == 'data'}

# Ruby code for Logstash 5.x
event.to_hash.delete_if {|k, v| k != 'data'}
event.to_hash.update(event.get('data').to_hash)
event.to_hash.delete_if {|k, v| k == 'data'}
"
}
}
output {
stdout { codec => rubydebug }
elasticsearch {
hosts => 'localhost:9200'
index => 'test_kafka'
document_type => "%{[@metadata][type]}"
}
}

关于elasticsearch - (logstash) 仅索引来自 kafka 输入的 elasticsearch 中的特定数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40600708/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com