gpt4 book ai didi

java - Netty channel 写入未到达处理程序

转载 作者:可可西里 更新时间:2023-11-01 02:48:13 24 4
gpt4 key购买 nike

我正在学习 Netty 并制作一个通过 TCP 发送对象的简单应用程序的原型(prototype)。我的问题是,当我使用消息从服务器端调用 Channel.write 时,它似乎没有到达管道中的处理程序。当我从客户端向服务器发送消息时,它按预期工作。

这是代码。

服务器:

public class Main {     
private int serverPort;

private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;

private ServerBootstrap boot;
private ChannelFuture future;

private SomeDataChannelDuplexHandler duplex;

private Channel ch;

public Main(int serverPort) {
this.serverPort = serverPort;
}

public void initialise() {
boot = new ServerBootstrap();
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();

boot.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(0, 0, 2));

// Inbound
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 0));
ch.pipeline().addLast(new SomeDataDecoder());

// Outbound
ch.pipeline().addLast(new LengthFieldPrepender(2));
ch.pipeline().addLast(new SomeDataEncoder());

// In-Out
ch.pipeline().addLast(new SomeDataChannelDuplexHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
}

public void sendMessage() {
SomeData fd = new SomeData("hello", "localhost", 1234);
ChannelFuture future = ch.writeAndFlush(fd);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
System.out.println("send error: " + future.cause().toString());
} else {
System.out.println("send message ok");
}
}
});
}

public void startServer(){
try {
future = boot.bind(serverPort)
.sync()
.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
ch = future.channel();
}
});
} catch (InterruptedException e) {
// log failure
}
}

public void stopServer() {
workerGroup.shutdownGracefully()
.addListener(e -> System.out.println("workerGroup shutdown"));

bossGroup.shutdownGracefully()
.addListener(e -> System.out.println("bossGroup shutdown"));
}

public static void main(String[] args) throws InterruptedException {

Main m = new Main(5000);

m.initialise();
m.startServer();

final Scanner scanner = new Scanner(System.in);

System.out.println("running.");

while (true) {

final String input = scanner.nextLine();

if ("q".equals(input.trim())) {
break;
} else {
m.sendMessage();
}
}

scanner.close();
m.stopServer();
}
}

双工 channel 处理程序:

public class SomeDataChannelDuplexHandler extends ChannelDuplexHandler {

@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("duplex channel active");
ctx.fireChannelActive();
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("duplex channelRead");
if (msg instanceof SomeData) {
SomeData sd = (SomeData) msg;
System.out.println("received: " + sd);
} else {
System.out.println("some other object");
}
ctx.fireChannelRead(msg);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.ALL_IDLE) { // idle for no read and write
System.out.println("idle: " + event.state());
}
}
}
}

最后是编码器(解码器类似):

public class SomeDataEncoder extends MessageToByteEncoder<SomeData> {

@Override
protected void encode(ChannelHandlerContext ctx, SomeData msg, ByteBuf out) throws Exception {

System.out.println("in encoder, msg = " + msg);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);

oos.writeObject(msg.getName());
oos.writeObject(msg.getIp());
oos.writeInt(msg.getPort());
oos.close();

byte[] serialized = bos.toByteArray();
int size = serialized.length;

ByteBuf encoded = ctx.alloc().buffer(size);
encoded.writeBytes(bos.toByteArray());

out.writeBytes(encoded);
}
}

客户端:

public class Client {

String host = "10.188.36.66";
int port = 5000;

EventLoopGroup workerGroup = new NioEventLoopGroup();
ChannelFuture f;
private Channel ch;

public Client() {
}

public void startClient() throws InterruptedException {
Bootstrap boot = new Bootstrap();
boot.group(workerGroup);
boot.channel(NioSocketChannel.class);
boot.option(ChannelOption.SO_KEEPALIVE, true);
boot.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// Inbound
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 0));
ch.pipeline().addLast(new SomeDataDecoder());

// Outbound
ch.pipeline().addLast(new LengthFieldPrepender(2));
ch.pipeline().addLast(new SomeDataEncoder());

// Handler
ch.pipeline().addLast(new SomeDataHandler());
}
});

// Start the client
f = boot.connect(host, port).sync();
f.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
System.out.println("connected to server");
ch = f.channel();
}
});
}

public void stopClient() {
workerGroup.shutdownGracefully();
}

private void writeMessage(String input) {
SomeData data = new SomeData("client", "localhost", 3333);
ChannelFuture fut = ch.writeAndFlush(data);
fut.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
System.out.println("send message");
}
});
}

public static void main(String[] args) throws InterruptedException {
Client client = new Client();
client.startClient();

System.out.println("running.\n\n");
final Scanner scanner = new Scanner(System.in);

while (true) {

final String input = scanner.nextLine();

if ("q".equals(input.trim())) {
break;
} else {
client.writeMessage(input);
}
}

scanner.close();
client.stopClient(); //call this at some point to shutdown the client
}

}

和处理程序:

public class SomeDataHandler extends SimpleChannelInboundHandler<SomeData> {

private ChannelHandlerContext ctx;

@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("connected");
this.ctx = ctx;
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, SomeData msg) throws Exception {
System.out.println("got message: " + msg);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
System.out.println("caught exception: " + cause.getMessage());
ctx.close();
}
}

当我通过服务器端的控制台发送消息时,我得到输出:

running.
duplex channel active
duplex read
idle: ALL_IDLE
idle: ALL_IDLE

send message ok

因此看起来好像消息已发送但客户端未收到任何内容。

当我从客户端执行此操作时(在服务器控制台上):

in decoder, numBytes in message = 31
duplex channelRead
received: SomeData [name=client, ip=localhost, port=3333]

这是我所期望的。

那么问题出在哪里呢?是否与在服务器端使用 ChannelDuplexHandler 和在客户端使用 SimpleChannelInboundHandler 有关?我需要调用什么来将消息踢下管道吗?

更新我在服务器 sendMessage 方法中添加了对 future.isSuccess() 的检查,我得到了在控制台发送错误:java.lang.UnsupportedOperationException

最佳答案

(代表 OP 发布)

对于任何感兴趣的人来说,问题是我试图在服务器 channel 而不是正常 channel 上发送消息。 This post为我指明了正确的方向。

关于java - Netty channel 写入未到达处理程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42246482/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com