gpt4 book ai didi

java - 服务器在用户之间交替而不是广播

转载 作者:行者123 更新时间:2023-12-03 09:17:45 25 4
gpt4 key购买 nike

我一直在开发一个消息系统,用户在该系统中输入服务器 IP/端口,然后该服务器接收消息并将它们转发给服务器上的所有其他用户。整个程序基于我从头开始重写的回显服务器,它为每个 server.accept() 套接字创建两个线程,一个用于接收消息,一个用于发送消息。这两个线程通过 DatagramPacket 系统连接,因此如果服务器从一个套接字接收到一条消息,它将把它发送回所有其他用户,因为他们的线程正在监听相同的东西,这就是我遇到问题的地方;一切正常,除了接收消息的用户按登录时间交替。

两个客户端连接时的问题示例:

客户端 #1 发送了 10 条消息:

0
1
2
3
4
5
6
7
8
9

服务器接收所有这些。

客户端 #1 接收:

1
3
5
7
9

客户端 #2 接收:

0
2
4
6
8

这是客户端的代码:

import java.io.*;
import java.util.*;
import java.net.*;
import java.awt.*;
import java.awt.event.*;
import javax.swing.*;

public class MessageClient {
public static void main(String[] args) {
System.out.println("Starting Message System...");
Scanner in = new Scanner(System.in);
MessageClient mc = new MessageClient();
String input;
System.out.println(":System Started, type help for help.");
System.out.print(":");
while (true) {
input = in.nextLine();
if (input.equalsIgnoreCase("HELP")) {
mc.printHelp();
System.out.print(":");
} else if (input.equalsIgnoreCase("QUIT")) {
System.exit(0);
} else if (input.equalsIgnoreCase("CONNECT")) {
mc.connect(in);
in.nextLine();
System.out.print(":");
} else {
System.out.print("No command found.\n:");
}
}
}
public static void printHelp() {
System.out.println("help\tShow this prompt\nconnect\tStarts a new connection\nquit\tQuit the program\nexit\tExit a connection");
}
public void connect(Scanner in) {
Socket soc = null;
InetAddress addr = null;
System.out.print("IP_ADDRESS/HOST:");
String ip = in.nextLine();
System.out.print("PORT:");
int port = in.nextInt();
try {
System.out.println("Attempting to connect to HOST:\'" + ip + "\' on PORT:\'" + port + "\'");
addr = InetAddress.getByName(ip);
soc = new Socket(addr, port);
} catch(Exception e) {
System.out.println("Error connecting to server: " + e.getLocalizedMessage());
return;
}
SwingUtilities.invokeLater(new MessageGUI(ip + ":" + port, soc));
}
}

class MessageGUI implements Runnable {
public MessageGUI(String windowName, Socket server) {
JFrame window = new JFrame(windowName);
window.setDefaultCloseOperation(JFrame.DISPOSE_ON_CLOSE);
window.setSize(500, 300);
window.setLayout(new BorderLayout());
window.setVisible(true);

MessageReceive mr = new MessageReceive(server);
mr.setEditable(false);
mr.setBackground(new Color(0, 0, 0));
mr.setForeground(new Color(0, 255, 0));
mr.setVisible(true);
new Thread(mr).start();
window.add(mr, BorderLayout.CENTER);

DataOutputStream dos = null;
try {
dos = new DataOutputStream(server.getOutputStream());
} catch(Exception e) {
System.out.println("Error creating output stream to server: " + e.getLocalizedMessage());
}

JTextField input = new JTextField();
input.addActionListener(new MessageSend(server, input, dos));
input.setBackground(new Color(0, 0, 0));
input.setForeground(new Color(0, 255, 0));
window.add(input, BorderLayout.PAGE_END);

System.out.println("Displaying connection.");
}
public void run() {}
}

class MessageReceive extends JTextArea implements Runnable {
protected Socket server;
public MessageReceive(Socket server) {
this.server = server;
}
public void run() {
DataInputStream dis = null;
int bytes;
try {
dis = new DataInputStream(server.getInputStream());
} catch(Exception e) {
System.out.println("Error connecting server: " + e.getLocalizedMessage());
}
this.append("Connected.\n");
while (true) {
try {
while ((bytes = dis.read()) != -1) this.append(String.valueOf((char) bytes));
} catch(Exception e) {
System.out.println("Error reading from server: " + e.getLocalizedMessage());
return;
}
}
}
}

class MessageSend implements ActionListener {
protected Socket server;
protected JTextField input;
protected DataOutputStream dos = null;
public MessageSend(Socket server, JTextField input, DataOutputStream dos) {
this.server = server;
this.input = input;
this.dos = dos;
}
public void actionPerformed(ActionEvent ae) {
try {
dos.writeBytes(input.getText() + "\n");
input.setText("");
} catch(Exception e) {
System.out.println("Error writing to server output stream: " + e.getLocalizedMessage());
}
}
}

服务器代码如下:

import java.io.*;
import java.net.*;
import java.util.*;

public class MessageServer {
public static void main(String[] args) {
int port = Integer.parseInt(args[0]);
MessageServer ms = new MessageServer();
System.out.println("Starting server on port " + port + "...");
ServerSocket ss = null;
try {
ss = new ServerSocket(port);
} catch(Exception e) {
System.out.println("Error creating server: " + e.getLocalizedMessage());
System.exit(0);
}
System.out.println("Created server port, now waiting for users...");
Socket client = null;
DatagramSocket ds = null;
try {
ds = new DatagramSocket(4);
} catch(Exception e) {
System.out.println("IN:Error creating Datagram Server: " + e.getLocalizedMessage());
e.printStackTrace();
System.exit(0);
}
while (true) {
try {
client = ss.accept();
System.out.println("Connecting user: " + client.getInetAddress().toString());
} catch(Exception e) {
System.out.println("Error on server: " + e.getLocalizedMessage());
}
new MessageConnectionIn(client, ds).start();
new MessageConnectionOut(client, ds).start();
}
}
}

class MessageConnectionOut extends Thread {
protected Socket client;
public DatagramSocket ds;
public MessageConnectionOut(Socket client, DatagramSocket ds) {
this.client = client;
this.ds = ds;
}
public void run() {
this.setName(client.getInetAddress().getHostAddress() + ":OUT");
try {
System.out.println("OUT:User connected.");
DataOutputStream dos = new DataOutputStream(client.getOutputStream());
while (true) {
byte[] outgoing = new byte[4096];
DatagramPacket dp = new DatagramPacket(outgoing, outgoing.length);
ds.receive(dp);
dos.writeChars(new String(outgoing) + "\n");
}
} catch(Exception e) {
System.out.println("OUT:Error connecting " + this.getName() + ": " + e.getLocalizedMessage());
return;
}
}
}

class MessageConnectionIn extends Thread {
protected Socket client;
public DatagramSocket ds;
public MessageConnectionIn(Socket client, DatagramSocket ds) {
this.client = client;
this.ds = ds;
}
public void run() {
this.setName(client.getInetAddress().getHostAddress() + ":IN");
try {
System.out.println("IN:User connected.");
BufferedReader br = new BufferedReader(new InputStreamReader(client.getInputStream()));
while (true) {
String lineIn = br.readLine();
byte[] input = lineIn.getBytes();
System.out.println(lineIn);
byte[] output = new byte[4096];
for (int c = 0; c < output.length; c++) output[c] = 0x0;
for (int i = 0; i < input.length && i < output.length; i++) output[i] = input[i];
DatagramPacket dp = new DatagramPacket(output, output.length, InetAddress.getLocalHost(), 4);
ds.send(dp);
}
} catch(Exception e) {
System.out.println("IN:Error connecting to " + this.getName() + ": " + e.getLocalizedMessage());
return;
}
}
}

更新:

我尝试用 MulticastSockets 替换所有 DatagramSockets,并在声明它时将其添加到一个组,MessageServer.main()。出现同样的问题。

组播代码:

public class MessageServer {
public static void main(String[] args) {
int port = Integer.parseInt(args[0]);
MessageServer msgsrv = new MessageServer();
System.out.println("Starting server on port " + port + "...");
ServerSocket ss = null;
try {
ss = new ServerSocket(port);
} catch(Exception e) {
System.out.println("Error creating server: " + e.getLocalizedMessage());
System.exit(0);
}
System.out.println("Created server port, now waiting for users...");
Socket client = null;
MulticastSocket ms = null;
try {
ms = new MulticastSocket(4);
ms.joinGroup(InetAddress.getByName("225.65.65.65"));
} catch(Exception e) {
System.out.println("IN:Error creating Datagram Server: " + e.getLocalizedMessage());
e.printStackTrace();
System.exit(0);
}
while (true) {
try {
client = ss.accept();
System.out.println("Connecting user: " + client.getInetAddress().toString());
} catch(Exception e) {
System.out.println("Error on server: " + e.getLocalizedMessage());
}
new MessageConnectionIn(client, ms).start();
new MessageConnectionOut(client, ms).start();
}
}
}

class MessageConnectionOut extends Thread {
protected Socket client;
public MulticastSocket ms;
public MessageConnectionOut(Socket client, MulticastSocket ms) {
this.client = client;
this.ms = ms;
}
public void run() {
this.setName(client.getInetAddress().getHostAddress() + ":OUT");
try {
System.out.println("OUT:User connected.");
DataOutputStream dos = new DataOutputStream(client.getOutputStream());
while (true) {
byte[] outgoing = new byte[4096];
DatagramPacket dp = new DatagramPacket(outgoing, outgoing.length);
ms.receive(dp);
dos.writeChars(new String(outgoing) + "\n");
System.out.println("SENT_TO:" + this.getName());
}
} catch(Exception e) {
System.out.println("OUT:Error connecting " + this.getName() + ": " + e.getLocalizedMessage());
return;
}
}
}

class MessageConnectionIn extends Thread {
protected Socket client;
public MulticastSocket ms;
public MessageConnectionIn(Socket client, MulticastSocket ms) {
this.client = client;
this.ms = ms;
}
public void run() {
this.setName(client.getInetAddress().getHostAddress() + ":IN");
try {
System.out.println("IN:User connected.");
BufferedReader br = new BufferedReader(new InputStreamReader(client.getInputStream()));
while (true) {
String lineIn = br.readLine();
byte[] input = lineIn.getBytes();
System.out.println(lineIn);
byte[] output = new byte[4096];
for (int c = 0; c < output.length; c++) output[c] = 0x0;
for (int i = 0; i < input.length && i < output.length; i++) output[i] = input[i];
DatagramPacket dp = new DatagramPacket(output, output.length, InetAddress.getLocalHost(), 4);
ms.send(dp);
}
} catch(Exception e) {
System.out.println("IN:Error connecting to " + this.getName() + ": " + e.getLocalizedMessage());
return;
}
}
}

最佳答案

此示例可能对您有所帮助。

服务器有 2 个线程。

  1. 一个用于读取 UDP 消息。我使用了 2 个不同的端口,因为我只想避免同一进程读取消息。我没有 2 台机器来测试它。在我的本地主机上测试。
  2. 另一个线程将广播读者线程收到的 UDP 消息。

有一个线程安全列表,它在线程之间充当数据同步。接收到的数据添加到列表中。广播线程轮询数据列表,如果有任何广播,则 hibernate 500 微秒。线程是使用执行器创建的。

private final static String INET_ADDR = "224.0.0.3";
private final static int PORT1 = 8888;
private final static int PORT2 = 8889;
private static List<String> threadSafeList = null;

public static void main(String[] args) throws UnknownHostException, InterruptedException {
threadSafeList = new CopyOnWriteArrayList<String>();
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.submit(new Sender(InetAddress.getByName(INET_ADDR), PORT1));
executorService.submit(new Receiver(InetAddress.getByName(INET_ADDR), PORT2));
executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
}

private static class Receiver implements Runnable {

private InetAddress addr;
private int port;

public Receiver (InetAddress inetAddress, int port) throws UnknownHostException {
this.addr = InetAddress.getByName(INET_ADDR);
this.port = port;
}

public void run() {
System.out.println(" @ Receiver ");
System.out.println(" @ Receiver " + this.port);
byte[] buf = new byte[256];

try {
MulticastSocket clientSocket = new MulticastSocket(this.port);
//Joint the Multicast group.
clientSocket.joinGroup(this.addr);

while (true) {
// Receive the information and print it.
DatagramPacket msgPacket = new DatagramPacket(buf, buf.length);
clientSocket.receive(msgPacket);

String msg = new String(buf, 0, buf.length);
System.out.println("Socket 1 received msg: " + msg);
threadSafeList.add(msg);
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
}

private static class Sender implements Runnable {

private InetAddress addr;
private int port;

public Sender (InetAddress inetAddress, int port) throws UnknownHostException {
this.addr = InetAddress.getByName(INET_ADDR);
this.port = port;
}

public void run() {
System.out.println(" @ Sender Address " + new String(this.addr.getAddress()));
System.out.println(" @ Sender port " + this.port);
// Open a new DatagramSocket, which will be used to send the data.
while (true) {
try (DatagramSocket serverSocket = new DatagramSocket()) {
for (Iterator<String> it = threadSafeList.iterator(); !threadSafeList.isEmpty() && it.hasNext(); ) {

String i = it.next();
String msg = "Sent message no " + i;

// Create a packet that will contain the data
// (in the form of bytes) and send it.
DatagramPacket msgPacket = new DatagramPacket(msg.getBytes(), msg.getBytes().length, this.addr, this.port);
serverSocket.send(msgPacket);

threadSafeList.remove(i);
System.out.println("Server sent packet with msg: " + msg);
}
} catch (IOException ex) {
ex.printStackTrace();
}
try {
System.out.println("going for sleep");
Thread.currentThread().sleep(500);
System.out.println("going for sleeping");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

可以通过更改发送线程的创建来修改设计。每当接收方线程收到消息时,创建一个发送方线程并进行广播并关闭该线程。您可以使用可重用线程池而不是本示例中使用的固定线程池。您可以在创建发件人线程时将消息作为参数传递(因此可能根本不需要列表)并执行提交。我有代码。

    public static void main(String[] args) throws UnknownHostException,
InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.submit(new Receiver(InetAddress.getByName(INET_ADDR),
PORT2, executorService));
executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
}

和内部类,

    private static class Receiver implements Runnable {

private InetAddress addr;
private int port;
private ExecutorService executorService;

public Receiver(InetAddress inetAddress, int port,
ExecutorService executorService) throws UnknownHostException {
this.addr = InetAddress.getByName(INET_ADDR);
this.port = port;
this.executorService = executorService;
}

public void run() {
System.out.println(" @ Receiver ");
System.out.println(" @ Receiver " + this.port);
byte[] buf = new byte[256];

try {
MulticastSocket clientSocket = new MulticastSocket(this.port);
// Joint the Multicast group.
clientSocket.joinGroup(this.addr);

while (true) {
// Receive the information and print it.
DatagramPacket msgPacket = new DatagramPacket(buf,
buf.length);
clientSocket.receive(msgPacket);

String msg = new String(buf, 0, buf.length);
System.out.println("Socket 1 received msg: " + msg);
executorService.submit(new Sender(InetAddress
.getByName(INET_ADDR), PORT1, msg));
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
}

    private static class Sender implements Runnable {

private InetAddress addr;
private int port;
private String message;

public Sender(InetAddress inetAddress, int port, String message)
throws UnknownHostException {
this.addr = InetAddress.getByName(INET_ADDR);
this.port = port;
this.message = message;
}

public void run() {
System.out.println(" @ Sender Address "
+ new String(this.addr.getAddress()));
System.out.println(" @ Sender port " + this.port);
try {
DatagramSocket serverSocket = new DatagramSocket();
String msg = "Sent message no " + message;

// Create a packet that will contain the data
// (in the form of bytes) and send it.
DatagramPacket msgPacket = new DatagramPacket(msg.getBytes(),
msg.getBytes().length, this.addr, this.port);
serverSocket.send(msgPacket);

System.out.println("Server sent packet with msg: " + msg);
} catch (IOException ex) {
ex.printStackTrace();
}
}
}

客户端有2个线程,

  1. 一个用于阅读广播消息。
  2. 另一个用于循环发送 5 条消息。一旦完成,线程将关闭。

这里没有数据交换,所以没有线程安全列表。

    private static class Receiver implements Runnable {

private InetAddress addr;
private int port;

public Receiver(InetAddress inetAddress, int port)
throws UnknownHostException {
this.addr = InetAddress.getByName(INET_ADDR);
this.port = port;
}

public void run() {
System.out.println(" @ Receiver ");
System.out.println(" @ Receiver port " + this.port);
byte[] buf = new byte[256];

try (MulticastSocket clientSocket = new MulticastSocket(this.port)) {
// Joint the Multicast group.
clientSocket.joinGroup(this.addr);
while (true) {
// Receive the information and print it.
DatagramPacket msgPacket = new DatagramPacket(buf,
buf.length);
clientSocket.receive(msgPacket);

String msg = new String(buf, 0, buf.length);
System.out.println("Socket 1 received msg: " + msg);
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
}

    private static class Sender implements Runnable {

private InetAddress addr;
private int port;

public Sender(InetAddress inetAddress, int port)
throws UnknownHostException {
this.addr = InetAddress.getByName(INET_ADDR);
this.port = port;
}

public void run() {
System.out.println(" @ Sender Address "
+ new String(this.addr.getAddress()));
System.out.println(" @ Sender port " + this.port);
// Open a new DatagramSocket, which will be used to send the data.
try {
DatagramSocket serverSocket = new DatagramSocket();

for (int i = 0; i < 5; i++) {

System.out.println("inside loop");
String msg = "Sent message no 2" + i;

// Create a packet that will contain the data
// (in the form of bytes) and send it.
DatagramPacket msgPacket = new DatagramPacket(
msg.getBytes(), msg.getBytes().length, this.addr,
this.port);
System.out.println("Before sending to socket");
serverSocket.send(msgPacket);

System.out.println("Server sent packet with msg: " + msg);
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
}

This article sample code is extended further.

待微调的代码。

关于java - 服务器在用户之间交替而不是广播,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30769508/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com