gpt4 book ai didi

java - Netty 和预定执行器服务

转载 作者:行者123 更新时间:2023-12-02 03:28:51 26 4
gpt4 key购买 nike

我正在尝试创建一个 TCP 服务器,定期从数据库 (Redis) 读取数据并将其发送到适当的客户端。

但是,由于我对 Netty 还很陌生,所以我不知道如何安排它。我确实知道我需要使用这样的预定执行器服务:

ScheduledExecutorService e = Executors.newSingleThreadScheduledExecutor();
e.scheduleAtFixedRate(() -> {
System.out.println("Calling...");
// Do something
}, 1, 1, TimeUnit.SECONDS);

但是,当我尝试将其放入服务器代码中时,它只调用该方法一次。我试图把它放在不同的地方,但似乎仍然无法正确。我该怎么办?

这是服务器的代码:

package com.example.test.app;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class Server {

public static void main(String[] args) throws Exception
{
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

final ServerHandler handler = new ServerHandler();

try {

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
b.channel(NioServerSocketChannel.class);
b.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception
{
ch.pipeline().addLast(handler);
}

});
b.option(ChannelOption.SO_BACKLOG, 128);
b.childOption(ChannelOption.SO_KEEPALIVE, true);

ScheduledExecutorService e = Executors.newSingleThreadScheduledExecutor();
e.scheduleAtFixedRate(() -> {
System.out.println("Calling...");
handler.saySomething();
}, 1, 1, TimeUnit.SECONDS);

ChannelFuture f = b.bind(1337).sync();
f.channel().closeFuture().sync();

} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}

}

这是服务器处理程序:

package com.example.test.app;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class ServerHandler extends ChannelInboundHandlerAdapter {

private ChannelHandlerContext ctx;

@Override
public void channelActive(ChannelHandlerContext ctx)
{
this.ctx = ctx;
System.out.println("Someone's connedted!");
}

public void saySomething()
{
final ChannelFuture f = ctx.writeAndFlush("Sup!");
f.addListener((ChannelFutureListener) (ChannelFuture future) -> {
System.out.println("Something has been said!");
});
}

}

最佳答案

方法 saySomething() 会生成 NullPointerException 来调用 final ChannelFuture f = ctx.writeAndFlush("Sup!"); while ctx 为空。EventExecutorGroup.scheduleAtFixedRate javadoc 描述说“如果任务的任何执行遇到异常,则后续执行将被抑制”。所以这就是为什么你只被调用一次......

此外,只有当您将此处理程序的类注释为 @Sharable 时,Netty 似乎才允许您为不同的管道实例重复使用处理程序实例。否则会抛出异常。如果您的处理程序是无状态的(这不是您的情况,因为您的处理程序具有 ctx 成员),那么您应该将其注释为 @Sharable 并将其重新用于所有创建的管道。如果是有状态的,则为每个新管道(新客户端连接)创建一个新实例。

最后,要为每个连接的客户端安排任务,您可以使用 channelActive( ) 实现。该执行器实现了 ScheduledExecutorService,因此您还拥有 scheduleAtFixedRate。查看我的代码版本,看看它是否适合您。

服务器:

package com.example.test.app;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class Server {

public static void main(String[] args) throws Exception
{
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
b.channel(NioServerSocketChannel.class);
b.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception
{
ch.pipeline().addLast(new ServerHandler());
}

});
b.option(ChannelOption.SO_BACKLOG, 128);
b.childOption(ChannelOption.SO_KEEPALIVE, true);

// ScheduledExecutorService e = Executors.newSingleThreadScheduledExecutor();
// e.scheduleAtFixedRate(() -> {
// System.out.println("Calling...");
// handler.saySomething();
// }, 1, 1, TimeUnit.SECONDS);

ChannelFuture f = b.bind(1337).sync();
f.channel().closeFuture().sync();

} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}

}

服务器处理程序:

package com.example.test.app;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.concurrent.ScheduledFuture;

import java.util.concurrent.TimeUnit;

public class ServerHandler extends ChannelInboundHandlerAdapter {

private ScheduledFuture sf;

@Override
public void channelActive(ChannelHandlerContext ctx)
{
System.out.println("Someone's connedted! "+ctx.channel());
sf = ctx.executor().scheduleAtFixedRate(() -> {
System.out.println("Calling...");
saySomething(ctx);
}, 1, 1, TimeUnit.SECONDS);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) {
System.out.println("Someone's disconnected! "+ctx.channel());
sf.cancel(false);
}

private void saySomething(ChannelHandlerContext ctx)
{
final ChannelFuture f = ctx.writeAndFlush("Sup!");
f.addListener((ChannelFutureListener) (ChannelFuture future) -> {
System.out.println("Something has been said!");
});
}

}

关于java - Netty 和预定执行器服务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38393623/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com