gpt4 book ai didi

protocol-buffers - 大型数据集上的 gRPC 序列化缓慢

转载 作者:行者123 更新时间:2023-12-05 04:35:38 25 4
gpt4 key购买 nike

我知道谷歌声明 protobufs 不支持大消息 ( i.e. greater than 1 MB ),但我正在尝试使用 gRPC 流式传输一个数十兆字节的数据集,似乎有些人说它是 ok , 或者至少用 some splitting ...

但是,当我尝试以这种方式发送数组时(repeated uint32),在同一台本地计算机上大约需要 20 秒。

#proto
service PAS {
// analyze single file
rpc getPhotonRecords (PhotonRecordsRequest) returns (PhotonRecordsReply) {}
}

message PhotonRecordsRequest {
string fileName = 1;
}

message PhotonRecordsReply {
repeated uint32 PhotonRecords = 1;
}

其中 PhotonRecordsReply 的长度需要大约 1000 万 uint32...

有没有人知道如何加快速度?或者哪种技术更合适?

所以我认为我已经根据给出的评论和答案实现了流式处理,但它仍然需要相同的时间:

#proto
service PAS {
// analyze single file
rpc getPhotonRecords (PhotonRecordsRequest) returns (stream PhotonRecordsReply) {}
}
class PAS_GRPC(pas_pb2_grpc.PASServicer):

def getPhotonRecords(self, request: pas_pb2.PhotonRecordsRequest, _context):
raw_data_bytes = flb_tools.read_data_bytes(request.fileName)
data = flb_tools.reshape_flb_data(raw_data_bytes)
index = 0
chunk_size = 1024
len_data = len(data)
while index < len_data:
# last chunk
if index + chunk_size > len_data:
yield pas_pb2.PhotonRecordsReply(PhotonRecords=data[index:])
# all other chunks
else:
yield pas_pb2.PhotonRecordsReply(PhotonRecords=data[index:index + chunk_size])
index += chunk_size

最小重现 Github example

最佳答案

如果您将其更改为使用应该有所帮助的流。为我转移不到2秒。请注意,这是在没有 ssl 的情况下在本地主机上进行的。我把这段代码放在一起。我确实运行了它并且它起作用了。例如,不确定如果文件不是 4 字节的倍数会发生什么。此外,读取字节的字节顺序是 Java 的默认顺序。

我的 10 兆文件是这样制作的。

dd if=/dev/random  of=my_10mb_file bs=1024 count=10240

这是服务定义。我在此处添加的唯一内容是响应流。

service PAS {
// analyze single file
rpc getPhotonRecords (PhotonRecordsRequest) returns (stream PhotonRecordsReply) {}
}

这是服务器实现。

public class PhotonsServerImpl extends PASImplBase {

@Override
public void getPhotonRecords(PhotonRecordsRequest request, StreamObserver<PhotonRecordsReply> responseObserver) {
log.info("inside getPhotonRecords");

// open the file, I suggest using java.nio API for the fastest read times.
Path file = Paths.get(request.getFileName());
try (FileChannel fileChannel = FileChannel.open(file, StandardOpenOption.READ)) {

int blockSize = 1024 * 4;
ByteBuffer byteBuffer = ByteBuffer.allocate(blockSize);
boolean done = false;
while (!done) {
PhotonRecordsReply.Builder response = PhotonRecordsReply.newBuilder();
// read 1000 ints from the file.
byteBuffer.clear();
int read = fileChannel.read(byteBuffer);
if (read < blockSize) {
done = true;
}
// write to the response.
byteBuffer.flip();
for (int index = 0; index < read / 4; index++) {
response.addPhotonRecords(byteBuffer.getInt());
}
// send the response
responseObserver.onNext(response.build());
}
} catch (Exception e) {
log.error("", e);
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException());
}
responseObserver.onCompleted();
log.info("exit getPhotonRecords");

}
}

客户端只记录接收到的数组的大小。

public long getPhotonRecords(ManagedChannel channel) {
if (log.isInfoEnabled())
log.info("Enter - getPhotonRecords ");

PASGrpc.PASBlockingStub photonClient = PASGrpc.newBlockingStub(channel);

PhotonRecordsRequest request = PhotonRecordsRequest.newBuilder().setFileName("/udata/jdrummond/logs/my_10mb_file").build();

photonClient.getPhotonRecords(request).forEachRemaining(photonRecordsReply -> {
log.info("got this many photons: {}", photonRecordsReply.getPhotonRecordsCount());
});

return 0;
}

关于protocol-buffers - 大型数据集上的 gRPC 序列化缓慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70993553/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com