- ubuntu12.04环境下使用kvm ioctl接口实现最简单的虚拟机
- Ubuntu 通过无线网络安装Ubuntu Server启动系统后连接无线网络的方法
- 在Ubuntu上搭建网桥的方法
- ubuntu 虚拟机上网方式及相关配置详解
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.
这篇CFSDN的博客文章Go语言Elasticsearch数据清理工具思路详解由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.
微服务架构中收集通常大家都采用ELK进行日志收集,同时我们还采用了SkyWalking进行链路跟踪,而SkyWalking数据存储也用到了ES,SkyWalking每天产生大量的索引数据,如下:
WX20211008-104751@2x 。
这里一天大概产生了700左右个索引数据。对历史的链路数据我们不做过多的保留.
这里我整理了个小工具,可以定期清理es数据.
可以看到索引数据都是以日期结尾,我们可以根据日期去匹配索引数据,并对索引进行删除。这里需要考虑一点,有的Es服务开启了索引保护机制,不能通过*index去删除,只能通过索引的全名称去删除。所以我们整体流程如下:
1、获取es服务中全部索引数据.
2、根据当前时间-保留天数,获取要删除的日期.
3、通过字符串匹配,判断索引中是否包含要删除的日期,如果包含则进行删除.
4、工具友好性,我们可以通过配置文件配置ES服务地址、日期格式化类型、保留天数等信息.
要获取Es服务中全部索引数据,我们首先连接Es服务器,这里我们使用github.com/olivere/elastic/v7库操作Es.
连接ES
func GetEsClient(data Data) *elastic.Client { Init() file := "./eslog.log" logFile, _ := os.OpenFile(file, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) client, err := elastic.NewClient( elastic.SetURL(data.Host), elastic.SetSniff(false), elastic.SetInfoLog(log.New(logFile, "ES-INFO: ", 0)), elastic.SetTraceLog(log.New(logFile, "ES-TRACE: ", 0)), elastic.SetErrorLog(log.New(logFile, "ES-ERROR: ", 0)), ) if err != nil { return nil } return client}
我们通过GetEsClient方法,连接ES,并返回client,供后续方法使用。这里的Data是包含了ES服务地址等信息,我们后面会给出Data的数据结构.
获取全部索引数据 。
func getIndex(data Data) map[string]interface{} { client := GetEsClient(data) mapping := client.GetMapping() service := mapping.Index("*") result, err := service.Do(context.Background()) if err != nil { fmt.Printf("create index failed, err: %v", err) return nil } return result}
通过client.GetMapping().Index("*")API获取es服务中全部的索引数据,并返回,数据格式如下:
WX20211008-110537@2x 。
这次我们获取全部索引完成.
我们根据当前时间-保留天数,获取当前需要删除的日期数据。我们通过GoLang内置的函数库time完成该功能的实现.
currentTime := time.Now()//获取当前时间oldTime := currentTime.AddDate(0, 0, data.Day)//通过配置文件获取保留天数format := oldTime.Format(data.IndexFmt)//通过配置文件获取序列化日期格式
这里通过字符串匹配进行判断是否需要删除索引数据.
func delIndex(data Data) { currentTime := time.Now() oldTime := currentTime.AddDate(0, 0, data.Day) format := oldTime.Format(data.IndexFmt) index := getIndex(data)//获取全部索引 for k := range index {//遍历索引数据 fmt.Println("key:", k, "format:", format) if find := strings.Contains(k, format); find { //判断索引中是否包含要删除的日期格式, DelIndex(data, k)//如果包含则调用DelIndex方法删除 } }}
// DelIndex 删除 indexfunc DelIndex(data Data, index ...string) bool { client := GetEsClient(data) response, err := client.DeleteIndex(index...).Do(context.Background()) if err != nil { fmt.Printf("delete index failed, err: %v", err) return false } return response.Acknowledged}
通过DeleteIndexAPI删除指定的数据.
这里我们定义了Config和Data对象,对象结构如下:
type Config struct { Data []Data `json:"data"`} type Data struct { Host string `json:"host"` IndexFmt string `json:"index_fmt"` Day int `json:"day"`}
配置文件内容如下:
{ "data": [ { "host": "http://ip1:9200",//服务IP "index_fmt": "20060102",//日期格式化 "day": -1 //保留天数 保留1天 }, { "host": "http://ip2:9200/", "index_fmt": "20060102", "day": -1 }, { "host": "http://ip3:32093", "index_fmt": "2006.01.02", "day": -7 //保留天数 保留7天 } ]}
我们通过Init方法加载配置文件到Config,
var config Config func Init() { JsonParse := NewJsonStruct() //下面使用的是相对路径,config.json文件和main.go文件处于同一目录下 JsonParse.Load("config/config.json", &config)} type JsonStruct struct {} func NewJsonStruct() *JsonStruct { return &JsonStruct{}} func (jst *JsonStruct) Load(filename string, v interface{}) { //ReadFile函数会读取文件的全部内容,并将结果以[]byte类型返回 data, err := ioutil.ReadFile(filename) if err != nil { return } //读取的数据为json格式,需要进行解码 err = json.Unmarshal(data, v) if err != nil { return }}
编写Main方法运行程序:
func main() { Init() for i, datum := range config.Data { fmt.Printf("config data Host is [%s], fmt is [%s]", datum.Host, datum.IndexFmt) println(i) delIndex(datum) }}
这里我们依然遍历配置文件中的多个服务配置。可以同时管理多个Es服务.
package main import ( "encoding/json" "fmt" "io/ioutil" "strings" "time") type Config struct { Data []Data `json:"data"`} type Data struct { Host string `json:"host"` IndexFmt string `json:"index_fmt"` Day int `json:"day"`} var config Config func Init() { JsonParse := NewJsonStruct() //下面使用的是相对路径,config.json文件和main.go文件处于同一目录下 JsonParse.Load("config/config.json", &config)} type JsonStruct struct {} func NewJsonStruct() *JsonStruct { return &JsonStruct{}} func (jst *JsonStruct) Load(filename string, v interface{}) { //ReadFile函数会读取文件的全部内容,并将结果以[]byte类型返回 data, err := ioutil.ReadFile(filename) if err != nil { return } //读取的数据为json格式,需要进行解码 err = json.Unmarshal(data, v) if err != nil { return }} func delIndex(data Data) { currentTime := time.Now() oldTime := currentTime.AddDate(0, 0, data.Day) format := oldTime.Format(data.IndexFmt) index := getIndex(data) for k := range index { fmt.Println("key:", k, "format:", format) if find := strings.Contains(k, format); find { DelIndex(data, k) } }} func main() { Init() for i, datum := range config.Data { fmt.Printf("config data Host is [%s], fmt is [%s]", datum.Host, datum.IndexFmt) println(i) delIndex(datum) }}
package main import ( "context" "fmt" "github.com/olivere/elastic/v7" "log" "os" "time") // GetEsClient 初始化客户端func GetEsClient(data Data) *elastic.Client { Init() file := "./eslog.log" logFile, _ := os.OpenFile(file, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) // 应该判断error,此处简略 client, err := elastic.NewClient( elastic.SetURL(data.Host), elastic.SetSniff(false), elastic.SetInfoLog(log.New(logFile, "ES-INFO: ", 0)), elastic.SetTraceLog(log.New(logFile, "ES-TRACE: ", 0)), elastic.SetErrorLog(log.New(logFile, "ES-ERROR: ", 0)), ) if err != nil { return nil } return client} // IsDocExists 判断索引是否存储func IsDocExists(data Data, id string, index string) bool { client := GetEsClient(data) defer client.Stop() exist, _ := client.Exists().Index(index).Id(id).Do(context.Background()) if !exist { log.Println("ID may be incorrect! ", id) return false } return true} // PingNode 是否联通func PingNode(data Data) { start := time.Now() client := GetEsClient(data) info, code, err := client.Ping(data.Host).Do(context.Background()) if err != nil { fmt.Printf("ping es failed, err: %v", err) } duration := time.Since(start) fmt.Printf("cost time: %v", duration) fmt.Printf("Elasticsearch returned with code %d and version %s", code, info.Version.Number)} // GetDoc 获取文档func GetDoc(data Data, id string, index string) (*elastic.GetResult, error) { client := GetEsClient(data) defer client.Stop() if !IsDocExists(data, id, index) { return nil, fmt.Errorf("id不存在") } esResponse, err := client.Get().Index(index).Id(id).Do(context.Background()) if err != nil { return nil, err } return esResponse, nil} // CreateIndex 创建 indexfunc CreateIndex(data Data, index, mapping string) bool { client := GetEsClient(data) result, err := client.CreateIndex(index).BodyString(mapping).Do(context.Background()) if err != nil { fmt.Printf("create index failed, err: %v", err) return false } return result.Acknowledged} // DelIndex 删除 indexfunc DelIndex(data Data, index ...string) bool { client := GetEsClient(data) response, err := client.DeleteIndex(index...).Do(context.Background()) if err != nil { fmt.Printf("delete index failed, err: %v", err) return false } return response.Acknowledged} func getIndex(data Data) map[string]interface{} { client := GetEsClient(data) mapping := client.GetMapping() service := mapping.Index("*") result, err := service.Do(context.Background()) if err != nil { fmt.Printf("create index failed, err: %v", err) return nil } return result}
代码已经上传github需要的可自行下载.
到此这篇关于Go语言Elasticsearch数据清理工具的文章就介绍到这了,更多相关Go Elasticsearch数据清理工具内容请搜索我以前的文章或继续浏览下面的相关文章希望大家以后多多支持我! 。
原文链接:https://blog.csdn.net/July_whj/article/details/120648785 。
最后此篇关于Go语言Elasticsearch数据清理工具思路详解的文章就讲到这里了,如果你想了解更多关于Go语言Elasticsearch数据清理工具思路详解的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我需要为元素属性动态构建 XPath 查询,其中属性值由用户提供。我不确定如何清理或清理此值以防止 XPath 等同于 SQL 注入(inject)攻击。例如(在 PHP 中): xpath("//m
问题很简单:在使用 PHPmailer 类时我应该使用任何类型的清理吗? 我制作了使用 phpmailer 类发送电子邮件的简单发送邮件表单。目前我只使用“htmlspecialchars”进行清理(
你可以在python中创建一个在for循环退出时运行清理代码的迭代吗?就像是: from random import randint class Iterable: def __iter__(
假设我定期将数据插入 SQLite 数据库,然后清除前 50% 的数据,但我不清理。 我现在是否有类似文件前 50% 的清零页面之类的东西?如果我添加另一批数据,我是否正在填写那些清零的页面? 手册中
我有一堆 HTML 代码,我想在其中删除所有 HTML 标记。 我认为 Regex(正则表达式)可以做到这一点。通过搜索和替换,我将如何执行此操作? 我尝试了 ,我认为 * 是通配符,但显然不是。
我仍在学习 Haskell,我想知道是否有一种不太冗长的方法来使用 1 行代码来表达以下语句: map (\x -> (x, (if mod x 3 == 0 then "fizz" else "")
我需要怎么做才能正确清理/转义程序化SSH命令中输入的参数? 例如,路径参数- public boolean exists(String path) { try { Chann
这个问题已经有答案了: How to clear the canvas for redrawing (25 个回答) 已关闭10 个月前。 我目前正在尝试创建一个带有雨滴落下的 Canvas ,我唯一
我目前正在使用此过程来清理/过滤用户输入的评论 -> 这个是用来去掉斜线的……和 if (get_magic_quotes_gpc()) { function stripslashe
是否可以在 portal_setup 中删除旧的导入配置文件。 目前,我的网站上有许多可追溯到 2009 年的条目:: import-all-profile-Products.Archetypes_
假设我有多个指令,包括以下内容: ...template content... ...template content... 你如何销毁指令?通常我会在 jquery 中做一些我 $('#2').re
我正在开发一个可移植java应用程序,它可以在用户的PC(Windows XP)上动态生成一些文件。现在,我想要的是在java程序退出后删除这些临时文件。显然,java的文件删除机制是不可信的。即
我有一个 argv c 程序,它反转单词,并查看它是否是回文。我只是想清理输出并让它打印原始输入而不是相反的输入,但由于它是 argv,我似乎不知道该怎么做。 int main(int argc, c
我的网页上有一篇用 markdown 写的文章,我想在索引页上显示一份简短的简历。 问题是正文有markdown,我想在简历上显示纯文本。 例如: Article text: Hello people
在下面的代码片段中,可以做些什么来a)让编译器安静,b)清理交叉的指针困惑? extern struct tree *sintablein[sintablesize]; struct tree *(*
我试图弄清楚 WeakHashMap 在垃圾收集后如何清理。正如你们中许多人可能知道的那样,当 WeakHashMap 条目的键被垃圾回收时,它会自动删除。但是,例如,如果我做这样的事情: List>
我对构建的理解是,它只编译上次构建中编辑过的Java文件,而干净构建将删除所有类文件并重新编译所有文件。那么,当单独构建就足以满足我提供最新版本的类文件的需要时,干净构建的效用是什么? 最佳答案 有时
是否有任何简单的(内置的、附加的、开源的或商业的)在 Postgresql(主从)上进行复制,以便在复制时清理从属内部的数据以符合 PCI 合规性? ETL工具怎么样?它不一定是瞬时的……最多一个小时
我有一个将数据保存到 MySQL 数据库的网站 在将 HTML 插入 MySQL 或在我的网站上显示它时,我应该转义 HTML 吗? 理想情况下,我想将原始 HTML 输入到我的数据库中,并在每次从中
我知道我已经asked一个关于 sanitizer 和转义的问题,但我有一个问题没有得到回答。 好了,到此为止。如果我有一个 PHP 脚本并且我 GET用户输入和SELECT它来自 mySQL 数据库
我是一名优秀的程序员,十分优秀!