gpt4 book ai didi

elasticsearch - 用于 Golang 的 Olivere 包中的 BulkIndexer 用于替换 Elastigo

转载 作者:IT王子 更新时间:2023-10-29 01:56:39 26 4
gpt4 key购买 nike

我注意到,如果我想将数据批量发送到 elasticsearch,我可以使用 BulkIndexer。如 Elastigo 文档中所述

A bulk indexer creates goroutines, and channels for connecting and sending data to elasticsearch in bulk, using buffers.

elastigo 中的代码以批量插入

var c_es = elastigo.NewConn()
var indexer = c_es.NewBulkIndexer(50)

func insertInBulkElastic(){
//Create a custom error function when inserting data into elasticsearch
//in bulk
indexer.Sender = func(buf *bytes.Buffer) error {
// @buf is the buffer of docs about to be written
respJson, err := c_es.DoCommand("POST", "/_bulk", nil, buf)
if err != nil {
// handle it better than this

fmt.Println("Error", string(respJson)) //

fmt.Println("Error", err)
}

if err == nil {
fmt.Println("The data was inserted successfullly to elastic search")
}
return err
}



}

有谁知道如何使用 olivere for golang 发送批量请求?

谢谢

最佳答案

这是一个在 Go 中使用 olivere 的工作示例。您可以阅读有关 BulkProcessor 的更多信息 here

希望这有帮助:)

package main

import (
"context"
"log"
"time"

elastic "gopkg.in/olivere/elastic.v5"
)

func main() {
options := []elastic.ClientOptionFunc{
elastic.SetHealthcheck(true),
elastic.SetHealthcheckTimeout(20 * time.Second),
elastic.SetSniff(false),
elastic.SetHealthcheckInterval(30 * time.Second),
elastic.SetURL("http://127.0.0.1:9200"),
elastic.SetRetrier(elastic.NewBackoffRetrier(elastic.NewConstantBackoff(5 * time.Second))),
}
client, err := elastic.NewClient(options...)
if err != nil {
panic(err)
}
// ensure index exist
exists, err := client.IndexExists("my_index").Do(context.Background())
if err != nil {
panic(err)
}
if !exists {
if _, err := client.CreateIndex("my_index").Do(context.Background()); err != nil {
panic(err)
}
}
client.PutMapping().Index("my_index").BodyJson(map[string]interface{}{
"properties": map[string]string{
"name": "keyword",
},
}).Do(context.Background())

// create new bulk processor from client
bulkProcessor, err := elastic.NewBulkProcessorService(client).
Workers(5).
BulkActions(1000).
FlushInterval(1 * time.Second).
After(after).
Do(context.Background())

// now the bulk processor can be reused for entire the app
myDoc := struct {
Name string
}{
Name: "jack",
}
req := elastic.NewBulkIndexRequest()
req.Index("my_index").Type("type").Id("my_doc_id").Doc(myDoc)

// Use Add method to add request into the processor
bulkProcessor.Add(req)

// wait for sometime...
time.Sleep(5 * time.Second)
}

func after(executionID int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
if err != nil {
log.Printf("bulk commit failed, err: %v\n", err)
}
// do what ever you want in case bulk commit success
log.Printf("commit successfully, len(requests)=%d\n", len(requests))
}

关于elasticsearch - 用于 Golang 的 Olivere 包中的 BulkIndexer 用于替换 Elastigo,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53424391/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com