gpt4 book ai didi

java - 网络 4.1 : Send multiple WebSocketFrame Fragments and Closer with ContinuationWebSocketFrame

转载 作者:行者123 更新时间:2023-12-02 01:22:18 26 4
gpt4 key购买 nike

我有一个Subscriber接收 sequential callbacks下一个可用负载和 subsequent callback当收到最后一个有效负载时。 (下面是简化代码)

我想要做的是在每个有效负载回调上发送 WebSocketFrame 片段,并在订阅者完成时关闭整个逻辑框架。

我已经尝试过:

  1. new TextWebSocketFrame(false, 0, buf) 开始,并以 new TextWebSocketFrame(true, 0, buf) 结束。
  2. new ContinuationWebSocketFrame(false, 0, buf) 开始,以 new TextWebSocketFrame(false, 0, buf) 继续,并以 new TextWebSocketFrame 结束(真,0,buf)
  3. new ContinuationWebSocketFrame(false, 0, buf) 开始,以 new TextWebSocketFrame(false, 0, buf) 继续,并以 new ContinuationWebSocketFrame 结束(真,0,buf)

...以及更多组合,但我的 Chrome 浏览器客户端(版本 76.0.3809.100(官方版本)(64 位))要么报告完全违反 WebSock 协议(protocol),要么提示已收到多个 ContinuationWebSocketFrame。

您能否为我指出正确的方向,以便使用正确配置的 WebSocketFrames 的正确顺序使其正常工作?

非常感谢。

static class DataReceiver implements Subscriber<Object> {
private Channel channel;
private final AtomicBoolean firstFrame = new AtomicBoolean(false);

public DataReceiver(Channel channel) {
this.channel = channel;
}

@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}

@Override
public void onNext(Object obj) {
ByteBuf buf = JSONOps.serialize(obj, channel.alloc().ioBuffer());
if (firstFrame.compareAndSet(false, true)) {
channel.writeAndFlush(new ContinuationWebSocketFrame(false, 0, buf));
} else {
channel.writeAndFlush(new TextWebSocketFrame(false, 0, buf));
}

}

@Override
public void onError(Throwable t) {
log.error("Subscriber Error", t);
// TODO: Handle error
}

@Override
public void onComplete() {
// channel.writeAndFlush(new ContinuationWebSocketFrame(true, 0, Unpooled.EMPTY_BUFFER));
channel.writeAndFlush(new TextWebSocketFrame(true, 0, Unpooled.EMPTY_BUFFER));

}
}

最佳答案

现在可以工作了。有 2 个问题。一是我没有 WebSocketFrameAggregator在我的管道中。其次,收盘ContinuationWebSocketFrame一定有一些内容。当我用 Unpooled.EMPTY_BUFFER 关闭聚合时它不起作用.

事实证明,无论如何我都需要在末尾插入一个 JSON 数组。

工作(简化的类)如下所示:

static class DataReceiver implements Subscriber<Object> {
private Channel channel;
private final AtomicBoolean firstFrame = new AtomicBoolean(false);

public DataReceiver(Channel channel) {
this.channel = channel;
}

@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}

@Override
public void onNext(Object obj) {
ByteBuf buf = JSONOps.serialize(obj, channel.alloc().ioBuffer(4196));
if (firstFrame.compareAndSet(false, true)) {
ByteBuf b = Unpooled.wrappedBuffer(Unpooled.wrappedBuffer("[".getBytes()), buf);
channel.writeAndFlush(new TextWebSocketFrame(false, 0, b));
} else {
ByteBuf b = Unpooled.wrappedBuffer(Unpooled.wrappedBuffer(",".getBytes()), buf);
channel.writeAndFlush(new ContinuationWebSocketFrame(false, 0, b));
}
}

@Override
public void onError(Throwable t) {
log.error("Subscriber Error", t);
// TODO: Handle error
}

@Override
public void onComplete() {
channel.writeAndFlush(new ContinuationWebSocketFrame(true, 0, Unpooled.wrappedBuffer("]".getBytes())));
}
}

关于java - 网络 4.1 : Send multiple WebSocketFrame Fragments and Closer with ContinuationWebSocketFrame,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57614555/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com