gpt4 book ai didi

jdbc - 使用 logstash 和 jdbc 更新复杂的嵌套 elasticsearch 文档

转载 作者:行者123 更新时间:2023-11-29 02:48:44 25 4
gpt4 key购买 nike

假设 Oracle 架构具有以下表和列:

    Country        country_id; (Primary Key)        country_name;    Department        department_id; (Primary Key)        department_name;        country_id; (Foreign key to Country:country_id)    Employee        employee_id; (Primary Key)        employee_name;        department_id; (Foreign key to Department:department_id)

我有我的 Elasticsearch 文档,其中根元素是一个国家,它包含该国家/地区的所有部门又包含各个部门的所有员工。

所以文档结构是这样的:

    {      "mappings": {        "country": {          "properties": {            "country_id": { "type": "string"},            "country_name": { "type": "string"},                    "department": {              "type": "nested",              "properties": {                "department_id": { "type": "string"},                "department_name": { "type": "string"},                "employee": {                  "type": "nested",                  "properties": {                    "employee_id": { "type": "string"},                    "employee_name": { "type": "string"}                  }                }              }            }          }        }      }    }           

我希望能够在每个表上运行单独的输入 jdbc 查询,它们应该创建/更新/删除每当添加/更新/删除基表中的数据时,elasticsearch 文档中的数据。

这是一个示例问题,实际的表和数据结构更为复杂。所以我不是在寻找解决方案限于此。

有什么办法可以实现吗?

谢谢。

最佳答案

对于一级,直接使用 aggregate filter .您需要在它们之间有一个共同的 ID 以供引用。

filter {    

aggregate {
task_id => "%{id}"

code => "
map['id'] = event.get('id')
map['department'] ||= []
map['department'] << event.to_hash.each do |key,value| { key => value } end
"
push_previous_map_as_event => true
timeout => 150000
timeout_tags => ['aggregated']
}

if "aggregated" not in [tags] {
drop {}
}
}

Important : The output action should be update

    output {
elasticsearch {
action => "update"
...
}
}

解决第 2 级问题的一种方法是查询已编制索引的文档并使用嵌套记录更新它。 再次使用 aggregate filter ;文档应该有一个通用 ID,以便您可以查找并插入到正确的文档中。

filter {    
#get the document from elastic based on id and store it in 'emp'
elasticsearch {
hosts => ["${ELASTICSEARCH_HOST}/${INDEX_NAME}/${INDEX_TYPE}"]
query => "id:%{id}"
fields => { "employee" => "emp" }
}



aggregate {
task_id => "%{id}"
code => "
map['id'] = event.get('id')
map['employee'] = []
employeeArr = []
temp_emp = {}

event.to_hash.each do |key,value|
temp_emp[key] = value
end

#push the objects into an array
employeeArr.push(temp_emp)

empArr = event.get('emp')

for emp in empArr
emp['employee'] = employeeArr
map['employee'].push(emp)
end
"
push_previous_map_as_event => true
timeout => 150000
timeout_tags => ['aggregated']

}

if "aggregated" not in [tags] {
drop {}
}

}

output {

elasticsearch {
action => "update" #important
...
}
}

Also, in order to debug the ruby code, use the below in the output

output{
stdout { codec => dots }
}

关于jdbc - 使用 logstash 和 jdbc 更新复杂的嵌套 elasticsearch 文档,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34879108/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com