gpt4 book ai didi

go - grpc 服务器在同时发送多条消息后停止接收消息

转载 作者:数据小太阳 更新时间:2023-10-29 03:08:17 25 4
gpt4 key购买 nike

我正在实现一个简单的 grpc 服务,其中任务摘要将被发送到 grpc 服务器。如果我发送的消息数量较少,一切正常,但是当我开始发送 5000 条消息时,服务器停止并在客户端收到超出截止日期的消息。我也尝试重新连接,但发现错误消息为。

rpc error: code = Unavailable desc = all SubConns are in TransientFailure, latest connection error: timed out waiting for server handshake

服务器未显示任何错误并且处于事件状态。

我也尝试设置 GRPC_GO_REQUIRE_HANDSHAKE=off 但错误仍然存​​在。我还实现了批量发送摘要,但重复了相同的场景。

grpc 发送消息的数量有限制吗?

这是我的服务原型(prototype)


// The Result service definition.
service Result {
rpc ConntectMaster(ConnectionRequest) returns (stream ExecutionCommand) {}
rpc postSummary(Summary) returns(ExecutionCommand) {}
}

message Summary{

int32 successCount = 1;
int32 failedCount = 2;
int32 startTime = 3;
repeated TaskResult results = 4;
bool isLast = 5;
string id = 6;
}

服务器中的postSummary实现

// PostSummary posts the summary to the master
func (server *Server) PostSummary(ctx context.Context, in *pb.Summary) (*pb.ExecutionCommand, error) {

for i := 0; i < len(in.Results); i++ {

res := in.Results[i]
log.Printf("%s --> %d Res :: %s, len : %d", in.Id, i, res.Id, len(in.Results))

}
return &pb.ExecutionCommand{Type: stopExec}, nil
}
func postSummaryInBatch(executor *Executor, index int) {
summary := pb.Summary{
SuccessCount: int32(executor.summary.successCount),
FailedCount: int32(executor.summary.failedCount),
Results: []*pb.TaskResult{},
IsLast: false,
}

if index >= len(executor.summary.TaskResults) {
summary.IsLast = true
return
}

var to int
batch := 500
if (index + batch) <= len(executor.summary.TaskResults) {
to = index + batch
} else {
to = len(executor.summary.TaskResults)
}
for i := index; i < to; i++ {
result := executor.summary.TaskResults[i]
taskResult := pb.TaskResult{
Id: result.id,
Msg: result.msg,
Time: result.time,
}
// log.Printf("adding res : %s ", taskResult.Id)

if result.err != nil {
taskResult.IsError = true
}
summary.Results = append(summary.Results, &taskResult)
}
summary.Id = fmt.Sprintf("%d-%d", index, to)
log.Printf("sent from %d to %d ", index, to)
postSummary(executor, &summary, 0)
postSummaryInBatch(executor, to)
}

func postSummary(executor *Executor, summary *pb.Summary, retryCount int) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

cmd, err := client.PostSummary(ctx, summary)
if err != nil {
if retryCount < 3 {
reconnect(executor)
postSummary(executor, summary, retryCount+1)
}
log.Printf(err.Error())
// log.Fatal("cannot send summary report")
} else {
processServerCommand(executor, cmd)
}
}

最佳答案

grpc 默认的 maxReceiveMessageSize 是 4MB,你的 grpc 客户端可能超过了这个限制。

grpc 在传输层使用 h2,它只打开一个 tcp conn 并在其上多路复用“请求”,与 h1 相比减少了显着的开销,我不会太担心批处理,只会对 grpc 服务器进行单独调用。

关于go - grpc 服务器在同时发送多条消息后停止接收消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57253797/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com