gpt4 book ai didi

go - 客户端断开连接时如何正确使用 ctx.Done()?

转载 作者:IT王子 更新时间:2023-10-29 00:47:09 28 4
gpt4 key购买 nike

如果客户端因网络错误而断开连接,在我的情况下服务器必须关闭 pub/sub 连接。我知道 ctx.Done() 函数,但不知道如何在我的案例中正确使用它。有人可以解释一下吗?

grpc-go: 1.7.0

go版本go1.8.4

func (a *API) Notifications(in *empty.Empty, stream pb.Service_NotificationsServer) error {
ctx := stream.Context()
_, ok := user.FromContext(ctx)
if !ok {
return grpc.Errorf(codes.Unauthenticated, "user not found")
}

pubsub := a.redisClient.Subscribe("notifications")
defer pubsub.Close()

for {
msg, err := pubsub.ReceiveMessage()
if err != nil {
grpclog.Warningf("Notifications: pubsub error: %v", err)
return grpc.Errorf(codes.Internal, "pubsub error %v", err)
}

notification := &pb.Notification{}
err = json.Unmarshal([]byte(msg.Payload), notification)
if err != nil {
grpclog.Warningf("Notifications: parse error: %v", err)
continue
}
if err := stream.Send(notification); err != nil {
grpclog.Warningf("Notifications: %v", err)
return err
}
grpclog.Infof("Notifications: send msg %v", notification)
}
}

最佳答案

您可以使用选择。不是从函数中正常获取数据,而是使用 channel 获取数据并使用 go 例程来处理数据。像这样的事情:

func (a *API) Notifications(in *empty.Empty, stream 
pb.Service_NotificationsServer) error {
ctx := stream.Context()
_, ok := user.FromContext(ctx)
if !ok {
return grpc.Errorf(codes.Unauthenticated, "user not found")
}

pubsub := a.redisClient.Subscribe("notifications")
defer pubsub.Close()

// I can not build the code, so I assume the msg in your code Message struct
c := make(chan Message)
go func() {
for {
msg, err := pubsub.ReceiveMessage()
if err != nil {
grpclog.Warningf("Notifications: pubsub error: %v", err)
close(c)
return grpc.Errorf(codes.Internal, "pubsub error %v", err)
}
c<- msg
}
}()

for {
select {
case msg, ok := <-c:
if !ok {
// channel is closed handle it
}
notification := &pb.Notification{}
err = json.Unmarshal([]byte(msg.Payload), notification)
if err != nil {
grpclog.Warningf("Notifications: parse error: %v", err)
continue
}
if err := stream.Send(notification); err != nil {
grpclog.Warningf("Notifications: %v", err)
return err
}
grpclog.Infof("Notifications: send msg %v", notification)
case <- ctx.Done():
// do exit logic. some how close the pubsub, so next
// ReceiveMessage() return an error
// if forget to do that the go routine runs for ever
// until the end of main(), which I think its not what you wanted
pubsub.Close() // Its just pseudo code
return
}
}
}

从 channel 读取消息(我假设类型是消息),并使用 select 的力量。

在这个场景中另外两个相关的事情:

  1. 确保 go 例程在完成此函数后结束。我无法猜测,因为我不知道代码,但我假设有一个 Close() 方法用于关闭 pubsub 所以下一个 ReceiveMessage 返回一个错误。 (我看到延迟完成了我希望的工作)

  2. 如果在 ctx.Done 之前 ReceiveMessage 中有错误,您可以关闭 channel 然后中断循环。

关于go - 客户端断开连接时如何正确使用 ctx.Done()?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46842887/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com