gpt4 book ai didi

json - 使用Logstash过滤器处理来自Kafka主题的JSON消息

转载 作者:行者123 更新时间:2023-12-02 22:44:37 25 4
gpt4 key购买 nike

我正在使用Logstash 2.4从Kafka主题读取JSON消息并将其发送到Elasticsearch Index。

JSON格式如下-

{
"schema":
{
"type": "struct",
"fields": [
{
"type":"string",
"optional":false,
"field":"reloadID"
},
{
"type":"string",
"optional":false,
"field":"externalAccountID"
},
{
"type":"int64",
"optional":false,
"name":"org.apache.kafka.connect.data.Timestamp",
"version":1,
"field":"reloadDate"
},
{
"type":"int32",
"optional":false,
"field":"reloadAmount"
},
{
"type":"string",
"optional":true,
"field":"reloadChannel"
}
],
"optional":false,
"name":"reload"
},
"payload":
{
"reloadID":"328424295",
"externalAccountID":"9831200013",
"reloadDate":1446242463000,
"reloadAmount":240,
"reloadChannel":"C1"
}
}

我的配置文件中没有任何过滤器,来自ES索引的目标文档如下所示-
{
"_index" : "kafka_reloads",
"_type" : "logs",
"_id" : "AVfcyTU4SyCFNFP2z5-l",
"_score" : 1.0,
"_source" : {
"schema" : {
"type" : "struct",
"fields" : [ {
"type" : "string",
"optional" : false,
"field" : "reloadID"
}, {
"type" : "string",
"optional" : false,
"field" : "externalAccountID"
}, {
"type" : "int64",
"optional" : false,
"name" : "org.apache.kafka.connect.data.Timestamp",
"version" : 1,
"field" : "reloadDate"
}, {
"type" : "int32",
"optional" : false,
"field" : "reloadAmount"
}, {
"type" : "string",
"optional" : true,
"field" : "reloadChannel"
} ],
"optional" : false,
"name" : "reload"
},
"payload" : {
"reloadID" : "155559213",
"externalAccountID" : "9831200014",
"reloadDate" : 1449529746000,
"reloadAmount" : 140,
"reloadChannel" : "C1"
},
"@version" : "1",
"@timestamp" : "2016-10-19T11:56:09.973Z",
}
}

但是,我只希望将“有效载荷”字段的值部分移至我的ES索引作为目标JSON主体。因此,我尝试在配置文件中使用“突变”过滤器,如下所示-
input {
kafka {
zk_connect => "zksrv-1:2181,zksrv-2:2181,zksrv-4:2181"
group_id => "logstash"
topic_id => "reload"
consumer_threads => 3
}
}
filter {
mutate {
remove_field => [ "schema","@version","@timestamp" ]
}
}
output {
elasticsearch {
hosts => ["datanode-6:9200","datanode-2:9200"]
index => "kafka_reloads"
}
}

使用此过滤器,ES文档现在看起来如下所示-
{
"_index" : "kafka_reloads",
"_type" : "logs",
"_id" : "AVfch0yhSyCFNFP2z59f",
"_score" : 1.0,
"_source" : {
"payload" : {
"reloadID" : "850846698",
"externalAccountID" : "9831200013",
"reloadDate" : 1449356706000,
"reloadAmount" : 30,
"reloadChannel" : "C1"
}
}
}

但实际上应该像下面的样子
{
"_index" : "kafka_reloads",
"_type" : "logs",
"_id" : "AVfch0yhSyCFNFP2z59f",
"_score" : 1.0,
"_source" : {
"reloadID" : "850846698",
"externalAccountID" : "9831200013",
"reloadDate" : 1449356706000,
"reloadAmount" : 30,
"reloadChannel" : "C1"
}
}

有没有办法做到这一点?谁可以帮我这个事?

我也尝试了以下过滤器-
filter {
json {
source => "payload"
}
}

但这给了我错误-

解析JSON {:source =>“payload”,:raw => {“reloadID” =>“572584696”,“externalAccountID” =>“9831200011”,“reloadDate” => 1449093851000,“reloadAmount” => 180,“时出错reloadChannel“=>” C1“},:exception => java.lang.ClassCastException:org.jruby.RubyHash无法转换为org.jruby.RubyIO,:level =>:warn}

任何帮助都感激不尽。

谢谢
高塔姆·戈什

最佳答案

您可以使用以下ruby过滤器实现所需的功能:

  ruby {
code => "
event.to_hash.delete_if {|k, v| k != 'payload'}
event.to_hash.update(event['payload'].to_hash)
event.to_hash.delete_if {|k, v| k == 'payload'}
"
}

它的作用是:
  • 删除除payload之外的所有字段
  • 在根级别
  • 复制所有 payload内部字段
  • 删除payload字段本身

  • 您最终将获得所需的东西。

    关于json - 使用Logstash过滤器处理来自Kafka主题的JSON消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40131364/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com