gpt4 book ai didi

java - 当我使用 ReplaySubject 中的 Observable 时阻止 ChannelHandlerContext

转载 作者:行者123 更新时间:2023-12-02 01:56:31 24 4
gpt4 key购买 nike

我正在编写客户端-服务器应用程序。我从数据库获取数据并将其放入 rxjava2 的 ReplaySubject (ReplaySubject 是必要的,因为我需要保证每个客户端上的数据相同),当客户端连接订阅它时,我想将此数据发送给他,但是当我在我的头“可能的方式^_^”它挡住了。我所说的 block 是指它不发送数据,但是当我关闭服务器时,数据会立即显示在客户端。

我尝试在客户端和服务器端事件循环中添加一些线程(我想可能是线程阻塞,因为我使用“无限”源,所以要接收它,我需要另一个线程或类似的东西)。

服务器端 channel 代码:

public
class ClientHandler
extends SimpleChannelInboundHandler<DataWrapper> {


private final Observable<DataWrapper> data;

public ClientHandler(Observable<DataWrapper> data) {
this.data = data;
}


@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
// super.channelRegistered(ctx);
final Channel channel = ctx.channel();
Server
.INSTANCE
.appendToChannelGroup(channel);

}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// super.channelActive(ctx);
// i believe there is something wrong
data.subscribe(ctx::writeAndFlush);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
// rest skip
}

客户端:

public
class DirectNetworkCommunicator
extends SimpleChannelInboundHandler<DataWrapper> {


private Observable<DataWrapper> generatedData;
private ExecutorService fallbackThread;


DirectNetworkCommunicator(Observable<DataWrapper> generatedData) {
this.fallbackThread = Executors.newSingleThreadExecutor();
this.generatedData = generatedData;
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// super.channelRead(ctx, msg);
DataWrapper inComingData = (DataWrapper) msg;
Adapter
.INSTANCE
.appendFromNettworkData(inComingData);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// super.channelReadComplete(ctx);
ctx.flush();
}
// rest skip
}

所以我之前提到过,我希望它能够在数据不存在时接收数据,而不是在服务器关闭时接收数据^_^。如果这对 netty 版本 4.1.37 最终版本有帮助。

最佳答案

好的,所以 future 的人们会面临同样的问题,我自己找到了答案。客户端的 Netty 使用后台线程作为通信的主要线程,这意味着我必须等待主线程释放才能对可观察的操作进行操作。希望它能帮助某人。

关于java - 当我使用 ReplaySubject 中的 Observable 时阻止 ChannelHandlerContext,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57395571/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com