gpt4 book ai didi

java - Netty动态Channelhandler管道

转载 作者:行者123 更新时间:2023-12-02 10:51:34 25 4
gpt4 key购买 nike

我正在尝试使用动态 ChannelHandler 管道实现 Netty 4.X。正如人们建议的“出于性能考虑,在运行时使用调用而不是管道修改。”,我实现了一个服务器、一个 RouterInboundHander 和一个客户端来测试这个理论。但它不起作用。这是我的代码

服务器

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import org.apache.log4j.Logger;

public class Server implements Runnable{

private static final Logger logger = Logger.getLogger(Server.class);
public static final int PORT = 9528;
public static final String TIME = "time";
public static final String REVERSE = "reverse";
public static final String ERROR = "error";


@Override
public void run() {

final EventLoopGroup boss = new NioEventLoopGroup();
final EventLoopGroup work = new NioEventLoopGroup();
try {
new ServerBootstrap()
.group(boss, work)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(final NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024 * 1024, RouterInboundHandler.DELIMINATOR));
ch.pipeline().addLast(new RouterInboundHandler());
// ch.pipeline().addLast(new RouterInboundHandler.StringWriterOutboundHandler());
}
}).bind(PORT).sync().channel().closeFuture().sync();
} catch (final Exception ex){
ex.printStackTrace();
} finally {
boss.shutdownGracefully();
work.shutdownGracefully();
}
}

public static void main(String[] args) {
new Thread(new Server()).start();
}

}

RouterInboundHandler

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import org.apache.log4j.Logger;

import java.util.Date;

public class RouterInboundHandler extends SimpleChannelInboundHandler<ByteBuf> {

private static final Logger logger = Logger.getLogger(RouterInboundHandler.class);
public static final ByteBuf DELIMINATOR = Unpooled.copiedBuffer("\r\n".getBytes());

private final ChannelInboundHandler timer = new TimePrinterInboundHandler();
private final ChannelInboundHandler string = new StringReverseHandler();
private final ChannelInboundHandler error = new ErrorInboundHander();


@Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
System.out.println("Connection made");
}

@Override
public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
System.out.println("OOOPS");
}

@Override
protected void channelRead0(final ChannelHandlerContext ctx, final ByteBuf msg) throws Exception {
final byte[] data = new byte[msg.readableBytes()];
msg.readBytes(data);
final String command = new String(data);

if (command.equals(Server.TIME)) {
timer.channelRead(ctx, command);
} else if (command.equals(Server.REVERSE)) {
string.channelRead(ctx, command);
} else {
error.channelRead(ctx, command);
}
}

@Override
public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}

public static final class StringWriterOutboundHandler extends ChannelOutboundHandlerAdapter {
@Override
public void write(final ChannelHandlerContext ctx,
final Object msg, final ChannelPromise promise) throws Exception {
if (msg.getClass().equals(String.class)) {
System.out.println("I am writing " + msg + " to the client");
ctx.writeAndFlush(Unpooled.copiedBuffer(((String) msg).getBytes()));
} else {
System.out.println("This is not supposed to be");
}
}
}

public static class TimePrinterInboundHandler extends SimpleChannelInboundHandler<String> {

@Override
protected void channelRead0(final ChannelHandlerContext ctx, final String msg) throws Exception {
System.out.println("I received message " + msg);
final String time = new Date(System.currentTimeMillis()).toString();
System.out.println("TimePrinterInboundHandler invoked");
ctx.writeAndFlush(Unpooled.copiedBuffer((time + " @ " + msg).getBytes()));
}
}

public static class StringReverseHandler extends SimpleChannelInboundHandler<String> {

@Override
protected void channelRead0(final ChannelHandlerContext ctx, final String msg) throws Exception {
final byte[] data = msg.getBytes();
final byte[] newData = new byte[data.length];
for (int i = 1; i <= data.length; i++) {
newData[data.length - i] = data[i - 1];
}
System.out.println("StringReverseHandler invoked");
ctx.writeAndFlush(Unpooled.copiedBuffer(new String(newData).getBytes()));
}
}

public static class ErrorInboundHander extends SimpleChannelInboundHandler<String> {

@Override
protected void channelRead0(final ChannelHandlerContext ctx,
final String msg) throws Exception {
System.out.println("ErrorInboundHandler invoked");
ctx.writeAndFlush(Unpooled.copiedBuffer(("Error appears, here is what you gave me [" + msg + "]").getBytes()));
}
}

}

和客户

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

public class Client implements Runnable{

private ChannelHandlerContext channelHandlerContext;

@Override
public void run() {

final EventLoopGroup work = new NioEventLoopGroup();

try {
new Bootstrap()
.group(work)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(final NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
Client.this.channelHandlerContext = ctx;
}

@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
if(msg instanceof ByteBuf){
final byte[] data = new byte[((ByteBuf) msg).readableBytes()];
((ByteBuf) msg).readBytes(data);
System.out.println(new String(data));
} else {
System.out.println("No");
}
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("ChannelReadComplete in Client");
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
System.out.println("Here");
}
});
}
}).connect("localhost", Server.PORT).sync().channel().closeFuture().sync();
} catch (final Exception ex){
ex.printStackTrace();
} finally {
work.shutdownGracefully();
}

}

public ChannelHandlerContext getChannelHandlerContext() {
return channelHandlerContext;
}

public void writeMessage(final String... message){
for(final String msg : message) {
channelHandlerContext.write(Unpooled.copiedBuffer(msg.getBytes()));
channelHandlerContext.write(RouterInboundHandler.DELIMINATOR);
}
channelHandlerContext.flush();
}

public static void main(String[] args) throws InterruptedException {
final Client client = new Client();
new Thread(client).start();
Thread.sleep(2000);
// client.writeMessage(Server.TIME, Server.REVERSE, "Hello World");
client.writeMessage("Hello World", Server.TIME, Server.REVERSE);
}

}

如代码所示,ChannelInboundHandler 的三个子类是在 Channel 的连接初始化阶段创建的。当客户端向服务器发送消息时,channelRead0 将检查命令并相应地运行不同的处理程序。

我的问题是,这是否是动态使用 Netty 管道的正确方法?以及为什么只有来自客户端的第一个请求得到响应?

最佳答案

如果没有更多信息,很难说你到底想做什么,但我怀疑问题是以下代码将尝试多次写入相同的 ByteBuf:


ChannelHandlerContext.write(RouterInboundHandler.DELIMINATOR);

由于多种原因,这是有问题的:

1) 您将共享相同的 reader/writerIndex,该索引可能会更新,因此可能会写入不同的内容2) ByteBuf 写入后将被释放,因此当您尝试再次写入时可能会收到 IllegalReferenceCountException。

您应该会看到 write(...) 返回的 ChannelFuthre 中反射(reflect)了任何写入错误。只需添加一个 ChannelFutureListener 并检查写入是否失败。

因此,如果没有更多信息,我认为您应该这样做:

channelHandlerContext.write(RouterInboundHandler.DELIMINATOR.retainedDuplicate());

关于java - Netty动态Channelhandler管道,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52145092/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com