gpt4 book ai didi

java - java中从一个套接字线程向所有现有套接字线程广播数据

转载 作者:行者123 更新时间:2023-12-01 11:44:21 27 4
gpt4 key购买 nike

我有一个 ServerSocket,它监听特定地址和端口上的连接,并为每个连接(客户端)创建一个线程以实现有状态协议(protocol),现在每个线程(客户端)消息在它自己的线程中处理,这里问题是:

1.我如何从一个线程(客户端套接字)向所有线程发送数据(只是广播一条消息。),我看到两个线程之间存在某种阻塞队列,这里我有一对多线程用于广播,并且还有私有(private)消息的一对一模型。

2.我看到一个同步块(synchronized block)解决方案,用于在线程之间共享数据或资源,但在我的代码中,我不知道应该在哪里实现它以及为什么?

<小时/>

服务器的入口点,ServerSocket 在此初始化并监听:

public class ServerStarter {
//All Users that creates from threads adding here.
public static Hashtable<String,User> users = new Hashtable<String,User>();
//All Channels add here.
public static Hashtable<String,Channel> channels = new Hashtable<String,Channel>();
final BlockingQueue<String> queue = new LinkedBlockingQueue<String>();

public static void main(String args[]){
Config config = new Config();
ServerSocket Server = null;
try {
//server configs,from left to right is: PORT,BackLog,Address
Server = new ServerSocket(config.port, config.backlog,config.ServerIP);
} catch (IOException e) {
System.err.println(e);
}

while (true) {
Socket sock = null;
BufferedReader inFromClient = null;
try {
sock = Server.accept();
} catch (IOException e) {
if (Server != null && !Server.isClosed()) {
try {
Server.close();
} catch (IOException e1)
{
e1.printStackTrace(System.err);
}
}
System.err.println(e);
}
try {
inFromClient = new BufferedReader(new InputStreamReader(sock.getInputStream()));
} catch (IOException e) {
System.err.println(e);
}
//each clients run on it's own thread!
new SocketThread(sock,inFromClient).start();
}
}
}
<小时/>

客户端套接字线程创建的位置在这里:

public class SocketThread extends Thread {
Socket csocket;
BufferedReader inFromClient;

public SocketThread(Socket csocket, BufferedReader inFromClient) {
this.csocket = csocket;
this.inFromClient = inFromClient;
}

public void run() {
try {
String fromclient = inFromClient.readLine();
//some primary informations sent to server for further processing.
RequestHandler Reqhandler = new RequestHandler(this.getId(), fromclient);
System.out.println("=======================================");
while (true) {
fromclient = inFromClient.readLine();
IRCParser parser = new IRCParser(fromclient);
//if the primary info's are OK and nothing causes to kill the thread the clients go to the infinite loop for processing messages.
Reqhandler.Commandhandler(parser.getCommand(), parser.getParameters());
}
} catch (IOException e) {
//kill current thread!
currentThread().interrupt();
return;
}
}
}

如果类和其他数据不够,请告诉我添加更多代码和注释,tnx

最佳答案

如果我正确理解您的要求,您只需要一些将消息从一个客户端线程传递到其他客户端线程的东西。我认为你应该能够在这里使用观察者模式。

类似这样的东西 - (请注意,我已经从您的代码中删除了显示消息广播概念不需要的所有其他内容。您可能需要根据您的要求将其更改回来)。

public class ServerStarter {

private static final ServerStarter singleton = new ServerStarter();

private volatile boolean shutdown;
// thread pool executor
private final ExecutorService executorService = Executors.newCachedThreadPool();
// observable to notify client threads
private final Observable observable = new Observable();
// fair lock (can use unfair lock if message broadcasting order is not important)
private final Lock fairLock = new ReentrantLock(true);

private ServerStarter() {
}

public static ServerStarter getInstance() {
return singleton;
}


public static void main(String args[]) {
ServerSocket server = null;
try {
//server configs,from left to right is: PORT,BackLog,Address
server = new ServerSocket();
while (!ServerStarter.getInstance().isShutdown()) {
Socket sock = server.accept();
BufferedReader inFromClient = new BufferedReader(new InputStreamReader(sock.getInputStream()));
//each clients run on it's own thread!
SocketThread clientThread = new SocketThread(sock, inFromClient);
ServerStarter.getInstance().registerClientThread(clientThread);
ServerStarter.getInstance().startClientThread(clientThread);
}
} catch (IOException e) {
if (server != null) {
try {
server.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
e.printStackTrace();
}
}

public void shutdown() {
shutdown = true;
}

public boolean isShutdown() {
return shutdown;
}

public void startClientThread(SocketThread clientThread) {
executorService.submit(clientThread);
}

private void registerClientThread(SocketThread clientThread) {
observable.addObserver(clientThread);
}

public void notifyAllClients(final Object message) {
fairLock.lock();
try {
executorService.submit(new MessageBroadcaster(message));
} finally {
fairLock.unlock();
}
}

public void unregisterClientThread(SocketThread clientThread) {
fairLock.lock();
try {
observable.deleteObserver(clientThread);
} finally {
fairLock.unlock();
}
}

private class MessageBroadcaster implements Runnable {
private final Object message;

public MessageBroadcaster(Object message) {
this.message = message;
}

@Override
public void run() {
fairLock.lock();
try {
observable.notifyObservers(message);
} finally {
fairLock.unlock();
}
}
}
}

class SocketThread implements Runnable, Observer {
Socket clientSocket;
BufferedReader inFromClient;

public SocketThread(Socket clientSocket, BufferedReader inFromClient) {
this.clientSocket = clientSocket;
this.inFromClient = inFromClient;
}

public void run() {
try {
String fromClient;
while (!ServerStarter.getInstance().isShutdown() && (fromClient = inFromClient.readLine()) != null) {
// TODO...prepare message to broadcast
Object message = new Object();
ServerStarter.getInstance().notifyAllClients(message);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
ServerStarter.getInstance().unregisterClientThread(this);
}
}

@Override
public void update(Observable o, Object message) {
// TODO...handle the message
}
}

当客户端线程想要通知其他客户端线程时,它会异步使用 observable 来通知其他线程。 observable 将调用每个客户端线程的 update() 方法。

关于java - java中从一个套接字线程向所有现有套接字线程广播数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29273182/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com