gpt4 book ai didi

grpc-java:正确处理客户端上的服务流调用重试

转载 作者:行者123 更新时间:2023-12-03 20:26:15 27 4
gpt4 key购买 nike

我正在尝试使用服务流和客户端上的异步 stub 在 grpc 上设置一个简单的发布/订阅模式。在将部分流消息实现回客户端后,我想处理连接断开的情况。现在我正在执行部分服务,例如关闭服务并且客户端应该从连接丢失中“恢复”。

我已经在 google/github/so 上阅读并搜索了有关重试机制的信息,并最终为流消息的服务中的方法设置了重试策略。据我了解,当服务返回重试策略中定义的一些 retryableStatusCodes 时,重试机制应该起作用。在客户端引入重试策略后,我想对其进行测试,以下两个场景的结果让我对重试感到困惑。

第一个场景:

  • 连接过程被调用(在 ~n 秒后有意没有消息流回客户端)
  • 服务被关闭
  • onError 不是 拜访客户
  • 服务又起来了
  • 连接 重试再次到达

  • 第二种情况:
  • 连接过程被调用(在 ~n 秒后第一条消息到达,消息在客户端的 onNext 处理程序中处理)
  • 服务被关闭
  • onError 拜访客户
  • 服务又起来了
  • 连接 不是 重试再次到达

  • 总的来说,让我感到困惑的是为什么这两种场景之间的行为存在差异?为什么在第一个场景中检测到服务器返回 UNAVAILABLE 并尝试重试,但在第二个场景中,即使状态相同,重试也不起作用?

    以下是客户端连接调用、服务连接方法和客户端重试策略设置的代码
    client:

    messageStub.withWaitForReady().connect(messagesRequest, new StreamObserver<>() {
    @Override
    public void onNext(MessageResponse messageResponse) {
    //process new message
    MessageDto message = new MessageDto();
    message.setBody(messageResponse.getBody());
    message.setTitle(messageResponse.getTitle());

    messageService.broadcastMessage(message);
    }

    @Override
    public void onError(Throwable throwable) {
    //service went down
    LOGGER.error(throwable.getStackTrace());
    }

    @Override
    public void onCompleted() {
    //This method should be called when user logs out of the application
    LOGGER.info(String.format("Message streaming terminated for user %d", userId));
    }
    });
    service:

    @Override
    public void connect(MessageRequest request, StreamObserver<MessageResponse> responseObserver) {
    Long userId = request.getUserId();

    ServerCallStreamObserver<MessageResponse > serverCallStreamObserver =
    (ServerCallStreamObserver<MessageResponse >) responseObserver;
    serverCallStreamObserver.setOnCancelHandler(getOnCancelHandler(userId));
    registerClient(userId, serverCallStreamObserver);
    //responseObserver.onCompleted() is left out so connection is not terminated
    }


    @EventListener
    public void listenForMessages(MessageEvent messageEvent) {
    //omitted code (just some data retrieving - populate conn and message vars)....

    MessageResponse.Builder builder = MessageResponse.newBuilder();
    StreamObserver<MessageResponse> observer = conn.getResponseObserver();
    builder.setType(message.getType());
    builder.setTitle(message.getTitle());
    builder.setBody(message.getBody());

    observer.onNext(builder.build())
    }

    retryPolicy:

    {
    "methodConfig" : [
    {
    "name": [
    {
    "service": "ch.example.proto.MessageService",
    "method": "connect"
    }
    ],
    "retryPolicy": {
    "maxAttempts": 10,
    "initialBackoff": "5s",
    "maxBackoff": "30s",
    "backoffMultiplier": 2,
    "retryableStatusCodes": ["UNAVAILABLE"]
    }
    }
    ]
    }

    最佳答案

    问题是接收消息会提交 RPC。这在 gRFC A6 Client Retries 中讨论.它提到了 Response-Headers ,当服务器用第一条消息响应时隐式发送。

    本质上,一旦 gRPC 将数据传回客户端,就无法自动重试。如果 gRPC 重试,它应该如何将新流与它已经响应的流结合起来?是否应该跳过第一个 N回应?但是如果现在的 react 不同呢?对于元数据(通过 Response-Headers 交付)来说,问题更严重,因为这些无法再次提供给客户端。

    gRPC 能够将客户端的请求重播到多个后端,但是一旦它开始接收来自后端的响应,它将“固定”到该后端并且无法更改其决定。

    您将需要应用程序级别的重试来重新建立流。当客户端重新建立流时,它可能需要修改请求以通知服务器客户端已经收到了哪些消息。

    关于grpc-java:正确处理客户端上的服务流调用重试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60758900/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com