gpt4 book ai didi

gocql 阻塞太多并发读取请求(golang,Cassandra)

转载 作者:IT王子 更新时间:2023-10-29 02:21:57 27 4
gpt4 key购买 nike

使用 GOCQL(Golang、Cassandra),我最多发出 128 个请求,然后一切都挂起。我相信我正确发布查询的 getTicksForCassandraKey() 函数,但不确定。 GOCQL 最多只支持 128 个并发查询,所以我一定是做错了什么。所有查询都是读取的。

主要代码为:

inboundChannel := make(chan []bson.M, 30)
maxGoRoutinesCount := 30
chunkSize := int(math.Floor(float64(len(cassandraKeys)) / float64(maxGoRoutinesCount)))
log.Println("Chunk size will be: ", chunkSize)
log.Println("Cassandra Keys length: ", cassandraKeys)
idx := 0
for idx < len(cassandraKeys) {
log.Println("idx: ", idx)
chunkOfKeys := cassandraKeys[idx:(idx + chunkSize)]
idx += chunkSize
go func(keys []string) {
log.Println("Received analysisKey on outbound channel. About to query this many keys: ", len(keys))
//Cassandra session can handle up to 128 concurrent queries
for _, c := range keys {
processedTicks, err := getTicksForCassandraKey(*session, c, startTime)
if err != nil {
log.Println("Error returning.")
return
}
log.Println("Finished query. About to post to inboundChannel for key: ", c)
inboundChannel <- processedTicks
}
}(chunkOfKeys)
}

processedIndex := 0
for processedTicks := range inboundChannel {
ticks = append(ticks, processedTicks...)
log.Println("Got processed ticks from inboundChannel: ", processedIndex)
processedIndex += 1
}
log.Println("End of function.")

getTicksForCassandraKey 的代码是:

func getTicksForCassandraKey(cassandraSession gocql.Session, cassandraAnalysisKey string, startTime time.Time) (ticks []bson.M, err error) {
log.Println("getTicksForCassandraKey: ", cassandraAnalysisKey)
cassandraQuery := "select * from analysisdata where analysis_key = '" + cassandraAnalysisKey + "' and time > ? ALLOW FILTERING;"
q := cassandraSession.Query(cassandraQuery, startTime)
iter := q.Iter()
var rawData string
var rawAnalysisKey string
var rawTime time.Time
for iter.Scan(&rawAnalysisKey, &rawTime, &rawData) {
processedAlgoTick, processingErr := processAlgoTick(rawAnalysisKey, rawTime, rawData)
if processingErr != nil {
err = processingErr
return
}
ticks = append(ticks, processedAlgoTick)
}
iterCloseErr := iter.Close()
q.Release()
log.Println("Closed iter for analysis key: ", cassandraAnalysisKey)
if iterCloseErr != nil {
log.Println("Cassandra Iterator.Close() Error:", iterCloseErr)
}
return
}

最佳答案

主代码中的 inboundChannel 是阻塞的。我将其放入一个 go 例程中并使用 sync.WaitGroup() 来解决它。

关于gocql 阻塞太多并发读取请求(golang,Cassandra),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44184365/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com