gpt4 book ai didi

elasticsearch - 在LogStash中,如何删除大于特定大小的任何json/xml字段

转载 作者:行者123 更新时间:2023-12-02 22:27:15 24 4
gpt4 key购买 nike

简而言之,我在公司中有用于公司日志的堆栈:

All Request/Response Log Files -> Filebeat -> Kafka -> Logstash - ElasiicSearch

相当普通的方法。

但是,可能会以意外的请求/响应格式存在很大的xml / json字段。我想只删除此特定字段/节点,而不管json或xml结构处于哪个级别,因为请求/响应可以是SOAP(XML)或rest(json)。

换句话说,我以前不知道响应/请求消息树/结构,也不想基于整个大小而丢弃整个消息,而只是丢弃一个大于特定大小的特定字段/节点。

例如:
2019-12-03 21:41:59.409  INFO 4055 --- [ntainer#0-0-C-1] Transaction Consumer                     : Message received successfully: {"serviceId":"insertEft_TransferPropias","sourceTransaction":"CMMO","xml":"PD94bWw some very large base 64 data ...}

我整个docker组成:
version: '3.2'
services:

zoo1:
image: elevy/zookeeper:latest
environment:
MYID: 1
SERVERS: zoo1
ports:
- "2181:2181"

kafka1:
image: wurstmeister/kafka
command: [start-kafka.sh]
depends_on:
- zoo1
links:
- zoo1
ports:
- "9092:9092"
environment:
KAFKA_LISTENERS: PLAINTEXT://:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092
KAFKA_BROKER_ID: 1
KAFKA_ADVERTISED_PORT: 9092
KAFKA_LOG_RETENTION_HOURS: "168"
KAFKA_LOG_RETENTION_BYTES: "100000000"
KAFKA_ZOOKEEPER_CONNECT: zoo1:2181
KAFKA_CREATE_TOPICS: "log:1:1"
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'

filebeat:
image: docker.elastic.co/beats/filebeat:7.5.2
command: filebeat -e -strict.perms=false
volumes:
- "//c/Users/Cast/docker_folders/filebeat.yml:/usr/share/filebeat/filebeat.yml:ro"
- "//c/Users/Cast/docker_folders/sample-logs:/sample-logs"
links:
- kafka1
depends_on:
- kafka1

elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.5.2
environment:
- cluster.name=docker-cluster
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- xpack.security.enabled=false
- xpack.watcher.enabled=false
- discovery.type=single-node
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- "//c/Users/Cast/docker_folders/esdata:/usr/share/elasticsearch/data"
ports:
- "9200:9200"

kibana:
image: docker.elastic.co/kibana/kibana:7.5.2
volumes:
- "//c/Users/Cast/docker_folders/kibana.yml:/usr/share/kibana/config/kibana.yml"
restart: always
environment:
- SERVER_NAME=kibana.localhost
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
ports:
- "5601:5601"
links:
- elasticsearch
depends_on:
- elasticsearch

logstash:
image: docker.elastic.co/logstash/logstash:7.5.2
volumes:
- "//c/Users/Cast/docker_folders/logstash.conf:/config-dir/logstash.conf"
restart: always
command: logstash -f /config-dir/logstash.conf
ports:
- "9600:9600"
- "7777:7777"
links:
- elasticsearch
- kafka1

logstash.conf
input{
kafka{
codec => "json"
bootstrap_servers => "kafka1:9092"
topics => ["app_logs","request_logs"]
tags => ["my-app"]
}
}

filter {
if [fields][topic_name] == "app_logs" {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} *%{LOGLEVEL:level} %{DATA:pid} --- *\[%{DATA:application}] *%{DATA:class} : %{GREEDYDATA:msglog}" }
tag_on_failure => ["not_date_line"]
}
date {
match => ["timestamp", "ISO8601"]
target => "timestamp"
}
if "_grokparsefailure" in [tags] {
mutate {
add_field => { "level" => "UNKNOWN" }
}
}
}
}

output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "%{[fields][topic_name]}-%{+YYYY.MM.dd}"
}
}

想像的解决方案
...
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} *%{LOGLEVEL:level} %{DATA:pid} --- *\[%{DATA:application}] *%{DATA:class} : %{GREEDYDATA:msglog}" }
tag_on_failure => ["not_date_line"]
}
...
if "_grokparsefailure" in [tags] {
filter {
mutate { remove_field => [ "field1", "field2", "field3", ... "fieldN" dinamically discovered based on size ] }
}
}

***已编辑

我不确定这种方法的效果如何,主要是因为在我看来,我将强制Logstash充当块阶段,将所有json传递到内存中并在保存到Elastic之前对其进行解析。顺便说一句,尚未在紧张的情况下进行测试,我的一个同事提出了这种替代方案
input...
filter {
if "JAVALOG" in [tags] {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{WORD:severity} (?<thread>\[.*]) (?<obj>.*)" }
}

json {
source => "obj"
target => "data"
skip_on_invalid_json => true
}
json {
source => "[data][entity]"
target => "request"
skip_on_invalid_json => true
}
mutate{ remove_field => [ "message" ]}
mutate{ remove_field => [ "obj" ]}
mutate { lowercase => [ "[tags][0]" ] }
mutate { lowercase => [ "meta_path" ] }
ruby {
code => '
request_msg = JSON.parse(event.get("[data][entity]"))
request_msg.to_hash.each do |key, value|
logger.info("field is: #{key}")
if value.to_s.length > 10
logger.info("field length is greater than 10!")
request_msg.delete("#{key}")
event.set("[data][entity]", request_msg.to_s)
end
end
'
}
mutate { remove_field => ["request"] }
json {
source => "data"
target => "data_1"
skip_on_invalid_json => true
}
}
}
output ...

最佳答案

您是否看过使用Logstash模板上可用的设置?

下面是一个示例:

PUT my_index
{
"mappings": {
"properties": {
"message": {
"type": "keyword",
"ignore_above": 20
}
}
}
}

资料来源: https://www.elastic.co/guide/en/elasticsearch/reference/current/ignore-above.html

关于elasticsearch - 在LogStash中,如何删除大于特定大小的任何json/xml字段,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61807090/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com