gpt4 book ai didi

java - RxNetty 重用连接

转载 作者:塔克拉玛干 更新时间:2023-11-02 19:07:51 25 4
gpt4 key购买 nike

我想在没有 Spring Cloud 的情况下使用 Netflix-Ribbon 作为 TCP 客户端负载均衡器,并且我编写了测试代码。

public class App  implements Runnable
{
public static String msg = "hello world";
public BaseLoadBalancer lb;
public RxClient<ByteBuf, ByteBuf > client;
public Server echo;

App(){
lb = new BaseLoadBalancer();
echo = new Server("localhost", 8000);
lb.setServersList(Lists.newArrayList(echo));
DefaultClientConfigImpl impl = DefaultClientConfigImpl.getClientConfigWithDefaultValues();
client = RibbonTransport.newTcpClient(lb, impl);
}
public static void main( String[] args ) throws Exception
{
for( int i = 40; i > 0; i--)
{
Thread t = new Thread(new App());
t.start();
t.join();
}
System.out.println("Main thread is finished");
}
public String sendAndRecvByRibbon(final String data)
{
String response = "";
try {
response = client.connect().flatMap(new Func1<ObservableConnection<ByteBuf, ByteBuf>,
Observable<ByteBuf>>() {
public Observable<ByteBuf> call(ObservableConnection<ByteBuf, ByteBuf> connection) {

connection.writeStringAndFlush(data);
return connection.getInput();
}
}).timeout(1, TimeUnit.SECONDS).retry(1).take(1)
.map(new Func1<ByteBuf, String>() {
public String call(ByteBuf ByteBuf) {
return ByteBuf.toString(Charset.defaultCharset());
}
})
.toBlocking()
.first();
}
catch (Exception e) {
System.out.println(((LoadBalancingRxClientWithPoolOptions) client).getMaxConcurrentRequests());
System.out.println(lb.getLoadBalancerStats());
}
return response;
}
public void run() {
for (int i = 0; i < 200; i++) {
sendAndRecvByRibbon(msg);
}
}

}

我发现每次调用sendAndRecvByRibbon 时它都会创建一个新套接字,即使poolEnabled 设置为true。所以,这让我很困惑,我想念什么?并且没有配置池大小的选项,但是有一个 PoolMaxThreadsMaxConnectionsPerHost

我的问题是如何在我的简单代码中使用连接池,我的sendAndRecvByRibbon有什么问题,它打开一个套接字然后只使用一次,我如何重用连接?谢谢你的时间。

服务器只是一个用pyhton3写的简单的回显服务器,我注释掉了conn.close()因为我想使用长连接。

import socket
import threading
import time
import socketserver
class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
def handle(self):
conn = self.request
while True:
client_data = conn.recv(1024)
if not client_data:
time.sleep(5)
conn.sendall(client_data)
# conn.close()

class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
pass

if __name__ == "__main__":
HOST, PORT = "localhost", 8000
server = ThreadedTCPServer((HOST, PORT), ThreadedTCPRequestHandler)
ip, port = server.server_address
server_thread = threading.Thread(target=server.serve_forever)
server_thread.daemon = True
server_thread.start()
server.serve_forever()

和mevan的pom,我只是在IED自动生成的POM中添加了两个依赖。

<dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
<version>1.6</version>
</dependency>
<dependency>
<groupId>com.netflix.ribbon</groupId>
<artifactId>ribbon</artifactId>
<version>2.2.2</version>
</dependency>

打印src_port的代码

@Sharable
public class InHandle extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(ctx.channel().localAddress());
super.channelRead(ctx, msg);
}
}

public class Pipeline implements PipelineConfigurator<ByteBuf, ByteBuf> {
public InHandle handler;
Pipeline() {
handler = new InHandle();
}
public void configureNewPipeline(ChannelPipeline pipeline) {
pipeline.addFirst(handler);
}
}

并将 client = RibbonTransport.newTcpClient(lb, impl); 更改为 Pipeline pipe = new Pipeline();client = RibbonTransport.newTcpClient(lb, pipe, impl, new DefaultLoadBalancerRetryHandler (impl));

最佳答案

因此,您的 App() 构造函数执行 lb/client/etc 的初始化。

然后您通过在第一个 for 循环中调用 new App() 来启动 40 个不同的线程和 40 个不同的 RxClient 实例(默认情况下每个实例都有自己的池)。为了清楚起见——您在此处生成多个 RxClient 实例的方式不允许它们共享任何公共(public)池。尝试改用一个 RxClient 实例。

如果你像下面这样改变你的主要方法,它会停止创建额外的套接字吗?

    public static void main( String[] args ) throws Exception
{
App app = new App() // Create things just once
for( int i = 40; i > 0; i--)
{
Thread t = new Thread(()->app.run()); // pass the run()
t.start();
t.join();
}
System.out.println("Main thread is finished");
}

如果以上没有完全帮助(至少它会减少 40 次创建的套接字数)- 你能澄清一下你是如何确定的吗:

i find it will create a new socket everytime i call sendAndRecvByRibbon

在使用此行更新构造函数后,您的测量结果是什么:

    DefaultClientConfigImpl impl = DefaultClientConfigImpl.getClientConfigWithDefaultValues();
impl.set(CommonClientConfigKey.PoolMaxThreads,1); //Add this one and test

更新

是的,查看 sendAndRecvByRibbon 似乎缺少标记 PooledConnection作为no longer acquired通过调用 close 一旦你不希望从它进一步读取。

只要你期望唯一的单一读取事件,只需更改这一行

connection.getInput()

return connection.getInput().zipWith(Observable.just(connection), new Func2<ByteBuf, ObservableConnection<ByteBuf, ByteBuf>, ByteBuf>() {
@Override
public ByteBuf call(ByteBuf byteBuf, ObservableConnection<ByteBuf, ByteBuf> conn) {
conn.close();
return byteBuf;
}
});

请注意,如果您要通过 TCP 设计更复杂的协议(protocol),则可以针对您的特定“通信结束”标志分析输入 bytebuf,这表明连接可以返回到池中。

关于java - RxNetty 重用连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51856938/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com