gpt4 book ai didi

go - gRPC下游的优雅关闭

转载 作者:IT王子 更新时间:2023-10-29 00:47:51 27 4
gpt4 key购买 nike

使用以下原型(prototype)缓冲区代码:

syntax = "proto3";

package pb;

message SimpleRequest {
int64 number = 1;
}

message SimpleResponse {
int64 doubled = 1;
}

// All the calls in this serivce preform the action of doubling a number.
// The streams will continuously send the next double, eg. 1, 2, 4, 8, 16.
service Test {
// This RPC streams from the server only.
rpc Downstream(SimpleRequest) returns (stream SimpleResponse);
}

我能够成功打开一个流,并不断从服务器获取下一个翻倍的数字。

我运行它的 go 代码如下所示:

ctxDownstream, cancel := context.WithCancel(ctx)
downstream, err := testClient.Downstream(ctxDownstream, &pb.SimpleRequest{Number: 1})
for {
responseDownstream, err := downstream.Recv()
if err != io.EOF {
println(fmt.Sprintf("downstream response: %d, error: %v", responseDownstream.Doubled, err))

if responseDownstream.Doubled >= 32 {
break
}
}
}
cancel() // !!This is not a graceful shutdown
println(fmt.Sprintf("%v", downstream.Trailer()))

我遇到的问题是使用上下文取消意味着我的 downstream.Trailer() 响应为空。有没有办法从客户端优雅地关闭此连接并接收 downstream.Trailer()。

注意:如果我从服务器端关闭下游连接,我的预告片就会被填充。但是我无法指示我的服务器端关闭这个特定的流。因此必须有一种方法可以优雅地关闭流客户端。

谢谢。

根据请求的一些服务器代码:

func (b *binding) Downstream(req *pb.SimpleRequest, stream pb.Test_DownstreamServer) error {
request := req

r := make(chan *pb.SimpleResponse)
e := make(chan error)
ticker := time.NewTicker(200 * time.Millisecond)
defer func() { ticker.Stop(); close(r); close(e) }()

go func() {
defer func() { recover() }()
for {
select {
case <-ticker.C:
response, err := b.Endpoint(stream.Context(), request)
if err != nil {
e <- err
}
r <- response
}
}
}()

for {
select {
case err := <-e:
return err
case response := <-r:
if err := stream.Send(response); err != nil {
return err
}
request.Number = response.Doubled
case <-stream.Context().Done():
return nil
}
}
}

您仍然需要用一些信息填充预告片。我使用 grpc.StreamServerInterceptor 来执行此操作。

最佳答案

根据grpc go文档

Trailer returns the trailer metadata from the server, if there is any. It must only be called after stream.CloseAndRecv has returned, or stream.Recv has returned a non-nil error (including io.EOF).

因此,如果您想在客户端中阅读预告片,请尝试这样的操作

ctxDownstream, cancel := context.WithCancel(ctx)
defer cancel()
for {
...
// on error or EOF
break;
}
println(fmt.Sprintf("%v", downstream.Trailer()))

出现错误时从无限循环中跳出并打印预告片。 cancel 将在函数结束时被调用,因为它是延迟的。

关于go - gRPC下游的优雅关闭,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49130970/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com