gpt4 book ai didi

java - 使用 Netty 创建一对多的 TCP 代理

转载 作者:可可西里 更新时间:2023-11-01 02:33:12 27 4
gpt4 key购买 nike

我正在尝试创建一个 TCP 代理,使用 Netty/Java 将请求转发到许多其他 TCP 端点。

例如:

                     /--> SERVER A 
Client A --> PROXY --
\--> SERVER B

如果客户端A通过代理发送一个TCP命令,代理打开两个到服务器A服务器B的TCP连接,并发将 Client A 发送的请求代理给它们。

如果 Client A 随后发送另一个命令,理论上代理先前已经将这两个连接缓存在一个池中,因此无需再次打开两个新连接,将请求代理到两个服务器。

关于响应处理,我想有两个选择:

  • 依次显示对客户端A的两个响应。
  • 或者完全忽略响应。

如果连接丢失或关闭,代理应该能够自动重新创建它并将其添加回连接池。

我一直在查看 Netty 示例,并尝试使用 ChannelGroup处理连接池,但没有成功。此外,在我下面的代码中,在发送第一个请求后,代理停止工作。有什么建议吗?

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOption;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.util.LinkedList;
import java.util.List;

public class TcpProxyHandler extends ChannelInboundHandlerAdapter {

private static List<String> hosts = new LinkedList<>();
private static List<String> connected = new LinkedList<>();

static {
hosts.add("127.0.0.1:10000");
hosts.add("127.0.0.1:20000");
}

static final ChannelGroup channels = new DefaultChannelGroup(
GlobalEventExecutor.INSTANCE);

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
final Channel inboundChannel = ctx.channel();

for (String host : hosts) {
if (!connected.contains(host)) {
String address = host.split(":")[0];
int port = Integer.parseInt(host.split(":")[1]);
Channel outboundChannel = ConnectionPool.getConnection(address,
port);
if (outboundChannel == null) {
Bootstrap b = new Bootstrap();
b.group(inboundChannel.eventLoop())
.channel(ctx.channel().getClass())
.handler(new TcpProxyBackendHandler(inboundChannel))
.option(ChannelOption.AUTO_READ, false);
ChannelFuture f = b.connect(address, port);
outboundChannel = f.channel();
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future)
throws Exception {
if (future.isSuccess()) {
// connection complete start to read first data
inboundChannel.read();
} else {
// Close the connection if the connection
// attempt
// has failed.
inboundChannel.close();
}
}
});

channels.add(outboundChannel);
connected.add(host);
System.out.println("Connected to " + host);
}
}

}

}

@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg)
throws Exception {
channels.flushAndWrite(msg);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
ctx.close();
}

static void closeOnFlush(Channel ch) {
if (ch.isActive()) {
ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(
ChannelFutureListener.CLOSE);
}
}

static class TcpProxyBackendHandler extends ChannelInboundHandlerAdapter {

private final Channel inboundChannel;

public TcpProxyBackendHandler(Channel inboundChannel) {
this.inboundChannel = inboundChannel;
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.read();
ctx.write(Unpooled.EMPTY_BUFFER);
}

@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg)
throws Exception {
inboundChannel.writeAndFlush(msg).addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future)
throws Exception {
if (future.isSuccess()) {
ctx.channel().read();
} else {
future.channel().close();
}
}
});
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
TcpProxyHandler.closeOnFlush(inboundChannel);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
TcpProxyHandler.closeOnFlush(ctx.channel());
}

}

}

最佳答案

您可以尝试在另一个线程中调用 connect() 和 read() 来让 ChannelGrop 的 worker 有机会完成它的工作。

关于java - 使用 Netty 创建一对多的 TCP 代理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19236925/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com