gpt4 book ai didi

java - Netty 4 - 池返回一个尚未准备好发送实际消息的 channel

转载 作者:可可西里 更新时间:2023-11-01 02:50:09 25 4
gpt4 key购买 nike

我已经创建了一个 SimpleChannelInboundHandler 类型的入站处理程序并添加到管道中。我的意图是每次建立连接时,我想发送一条称为 session 打开消息的应用程序消息,并使连接准备好发送实际消息。为此,上面的入站处理程序越过发送 session 打开消息的 channelActive(),作为响应,我会收到一条 session 打开确认消息。只有在那之后,我才能发送任意数量的实际业务消息。我正在使用 FixedChannelPool 并按如下方式初始化。这在启动时效果很好。但是如果远程主机关闭连接,之后如果调用下面的 sendMessage() 发送消息,则该消息甚至在 session 打开消息之前通过 channelActive() 发送并获得其响应。因此服务器会忽略该消息,因为在发送业务消息时 session 尚未打开。

我正在寻找的是,池应该只返回那些已经调用 channelActive() 事件的 channel ,这些 channel 已经发送了 session 打开消息并且它已经从服务器获得了 session 打开确认消息。如何处理这种情况?

public class SessionHandler extends SimpleChannelInboundHandler<byte[]> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
if (ctx.channel().isWritable()) {
ctx.channel().writeAndFlush("open session message".getBytes()).;
}
}
}

// At the time of loading the applicaiton
public void init() {
final Bootstrap bootStrap = new Bootstrap();
bootStrap.group(group).channel(NioSocketChannel.class).remoteAddress(hostname, port);
fixedPool = new FixedChannelPool(bootStrap, getChannelHandler(), 5);

// This is done to intialise connection and the channelActive() from above handler is invoked to keep the session open on startup
for (int i = 0; i < config.getMaxConnections(); i++) {
fixedPool.acquire().addListener(new FutureListener<Channel>() {
@Override
public void operationComplete(Future<Channel> future) throws Exception {
if (future.isSuccess()) {

} else {
LOGGER.error(" Channel initialzation failed...>>", future.cause());
}

}
});
}
}

//应用程序调用以下方法实际发送消息。

public void sendMessage(final String businessMessage) {
fixedPool.acquire().addListener(new FutureListener<Channel>() {
@Override
public void operationComplete(Future<Channel> future) throws Exception {
if (future.isSuccess()) {
Channel channel = future.get();
if (channel.isOpen() && channel.isActive() && channel.isWritable()) {
channel.writeAndFlush(businessMessage).addListener(new GenericFutureListener<ChannelFuture>() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
// success msg
} else {
// failure msg
}
}
});
fixedPool.release(channel);
}
} else {
// Failure
}
}
});
}

最佳答案

如果没有特定原因需要使用 FixedChannelPool,那么您可以使用其他数据结构(List/Map)来存储 Channels。您可以在发送打开 session 消息后向数据结构添加一个 channel ,并在 channelInactive 方法中将其删除。

如果您需要对 channel 执行批量操作,您可以使用 ChannelGroup为了这个目的。

如果您仍想使用FixedChannelPool,您可以在 channel 中设置一个关于是否发送打开消息的属性:

ctx.channel().attr(OPEN_MESSAGE_SENT).set(true);

您可以在您的sendMessage 函数中获取如下属性:

boolean sent = ctx.channel().attr(OPEN_MESSAGE_SENT).get();

并且在 channelInactive 中,您可以将其设置为 false 或将其删除。

注意 OPEN_MESSAGE_SENT 是一个 AttributeKey:

public static final AttributeKey<Boolean> OPEN_MESSAGE_SENT = AttributeKey.valueOf("OPEN_MESSAGE_SENT");

关于java - Netty 4 - 池返回一个尚未准备好发送实际消息的 channel ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53459500/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com