gpt4 book ai didi

java - gRPC 流被取消

转载 作者:行者123 更新时间:2023-11-30 07:45:39 24 4
gpt4 key购买 nike

我正在尝试通过 gRPC 创建一个流式 API,但我的 StreamObserver 由于某些奇怪的原因被取消了。这是我的 .proto声明:

service Service {
rpc connect (ConnectionRequest) returns (stream StreamResponse) {}

rpc act (stream ActRequest) returns (ActResponse) {}
}

这个想法是用户会 connect所以 StreamResponse将为每个用户保存,然后在每个 act 上保存StreamResponse将收到更新。这是 Service 的 java 实现

class ServiceImpl extends ServiceGrpc.ServiceImplBase {
............
@Override
public void connect(ConnectionRequest request, StreamObserver<StreamResponse> responseObserver) {
observers.add(responseObserver);
}

@Override
public StreamObserver<ActRequest> act(StreamObserver<ActResponse> responseObserver) {
return return new StreamObserver<ActRequest>() {
@Override
public void onNext(ActRequest actRequest) {
StreamResponse streamResponse = StreamResponse.newBuilder().build();
observers.forEach(o -> o.onNext(streamResponse));
actRequest..onCompleted();
}
...............
};
}
..................
}

这是客户端connect方法:

ServiceGrpc.ServiceStub stub = ServiceGrpc.newStub(channel);
new StreamObserver<PlayerResponse>() {
........
@Override
public void onNext(StreamResponse streamResponse) {
logger.info("Act: " streamResponse.getData());
}
........
}
ConnectionRequest request = ConnectionRequest.newBuilder().build();
stub.connect(request, streamObserver);

和客户端act方法:

StreamObserver<ActResponse> observer = new StreamObserver<ActResponse>() {
@Override
public void onNext(ActResponse actResponse) {
logger.info("Status: " + actResponse.getSuccess());
}
.........
};
StreamObserver<ActRequest> act = stub.act(observer);
act.onNext(MoveRequest.newBuilder().build());
act.onCompleted();

当我同时启动客户端和服务器时,客户端能够调用 connect .但是方法act只能在第一次调用。当客户端调用 act我第二次收到以下异常:

io.grpc.StatusRuntimeException: CANCELLED: call already cancelled
at io.grpc.Status.asRuntimeException(Status.java:517)
at io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:335)

在 Debug模式下我可以看到 StreamObserver<StreamResponse> responseObservercanceled=true .

最佳答案

我找到了为什么会出现这个错误。在客户端,我使用 Vaadin用于构建 UI。所以在StreamObserver<ActResponse>#onNext我正在调用一个抛出异常的 Vaadin 方法。

我在开发时犯的错误是我离开了 onError空的,因为它是一个 PoC。但是这个错误让我调试了几个小时。这是一个愚蠢的错误,但我会让它出现在这里,也许它可以为某些人节省一些时间。

关于java - gRPC 流被取消,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51554288/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com