gpt4 book ai didi

java - Java在多个阻止程序上进行阻止

转载 作者:搜寻专家 更新时间:2023-11-01 03:40:23 25 4
gpt4 key购买 nike

我不是一个非常有经验的Java程序员,所以如果这是一个新手问题,请原谅我。

我正在设计一个包含3个模块的系统。客户端,服务器和应用程序。想法是客户端将消息发送到服务器。服务器触发应用程序中的用例。用例的结果返回给服务器,服务器将结果发送给客户端。我之所以选择这种架构,是因为我希望同时支持多个客户端,我希望能够在其他应用程序中重用服务器模块,我希望使负责管理客户端连接的代码与该代码脱钩。尽可能地实现域逻辑,并且由于有机会学习一些更高级的Java。

我打算将各种模块与队列 bundle 在一起。客户端足够简单。发出消息并阻止,直到收到响应为止(这可能过于简化了,但是目前这是一个合理的模型)。该应用程序同样不是问题。它在输入队列中阻塞,在收到有效消息时执行用例,并将结果推送到输出队列。有多个客户会使事情变得有些棘手,但我的经验水平仍在我的掌握范围内。服务器为每个打开的连接维护线程,并且服务器出站/应用程序入站队列是同步的,因此,如果一次有2条消息到达,则第二个线程将只需要稍等片刻,等待第一个线程将其有效负载传递到队列中。

问题出在服务器的中间部分,这将不得不阻止两个独立的事物。服务器正在监视客户端和应用程序的输出队列(用作服务器的输入队列)。服务器需要阻塞,直到有消息从客户端传入(然后将其转发到应用程序),或者直到应用程序完成任务并将结果推送到应用程序出站/服务器入站队列中为止。

据我所知,Java只能阻止一件事。

在客户端发送消息或服务器入站队列不再为空之前,是否有可能使服务器阻塞?

更新:

我花了一些时间来解决这个问题,并设法将问题减少到可以说明问题的最低限度。即使进行了修整,也要遵循一些笨重的代码转储,因此深表歉意。我会尽力将其分解。

这是服务器的代码:

public class Server implements Runnable {

private int listenPort = 0;
private ServerSocket serverSocket = null;
private BlockingQueue<Message> upstreamMessaes = null;
private BlockingQueue<Message> downstreamMessages = null;
private Map<Integer, Session> sessions = new ConcurrentHashMap ();
private AtomicInteger lastId = new AtomicInteger ();

/**
* Start listening for clients to process
*
* @throws IOException
*/
@Override
public void run () {
int newSessionId;
Session newSession;
Thread newThread;

System.out.println (this.getClass () + " running");

// Client listen loop
while (true) {
newSessionId = this.lastId.incrementAndGet ();
try {
newSession = new Session (this, newSessionId);
newThread = new Thread (newSession);
newThread.setName ("ServerSession_" + newSessionId);
this.sessions.put (newSessionId, newSession);
newThread.start ();
} catch (IOException ex) {
Logger.getLogger (Server.class.getName ()).log (Level.SEVERE, null, ex);
}
}
}

/**
* Accept a connection from a new client
*
* @return The accepted Socket
* @throws IOException
*/
public Socket accept () throws IOException {
return this.getSocket().accept ();
}

/**
* Delete the specified Session
*
* @param sessionId ID of the Session to remove
*/
public void deleteSession (int sessionId) {
this.sessions.remove (sessionId);
}

/**
* Forward an incoming message from the Client to the application
*
* @param msg
* @return
* @throws InterruptedException
*/
public Server messageFromClient (Message msg) throws InterruptedException {
this.upstreamMessaes.put (msg);
return this;
}

/**
* Set the port to listen to
*
* We can only use ports in the range 1024-65535 (ports below 1024 are
* reserved for common protocols such as HTTP and ports above 65535 don't
* exist)
*
* @param listenPort
* @return Returns itself so further methods can be called
* @throws IllegalArgumentException
*/
public final Server setListenPort (int listenPort) throws IllegalArgumentException {
if ((listenPort > 1023) && (listenPort <= 65535)) {
this.listenPort = listenPort;
} else {
throw new IllegalArgumentException ("Port number " + listenPort + " not valid");
}

return this;
}

/**
* Get the server socket, initialize it if it isn't already started.
*
* @return The object's ServerSocket
* @throws IOException
*/
private ServerSocket getSocket () throws IOException {
if (null == this.serverSocket) {
this.serverSocket = new ServerSocket (this.listenPort);
}

return this.serverSocket;
}

/**
* Instantiate the server
*
* @param listenPort
* @throws IllegalArgumentException
*/
public Server ( int listenPort,
BlockingQueue<Message> incomingMessages,
BlockingQueue<Message> outgoingMessages) throws IllegalArgumentException {
this.setListenPort (listenPort);
this.upstreamMessaes = incomingMessages;
this.downstreamMessages = outgoingMessages;
System.out.println (this.getClass () + " created");
System.out.println ("Listening on port " + listenPort);
}
}

我相信以下方法属于服务器,但目前已被注释掉。
    /**
* Notify a Session of a message for it
*
* @param sessionMessage
*/
public void notifySession () throws InterruptedException, IOException {
Message sessionMessage = this.downstreamMessages.take ();
Session targetSession = this.sessions.get (sessionMessage.getClientID ());
targetSession.waitForServer (sessionMessage);
}

这是我的类(class)
public class Session implements Runnable {
private Socket clientSocket = null;
private OutputStreamWriter streamWriter = null;
private StringBuffer outputBuffer = null;
private Server server = null;
private int sessionId = 0;

/**
* Session main loop
*/
@Override
public void run () {

StringBuffer inputBuffer = new StringBuffer ();
BufferedReader inReader;

try {
// Connect message
this.sendMessageToClient ("Hello, you are client " + this.getId ());
inReader = new BufferedReader (new InputStreamReader (this.clientSocket.getInputStream (), "UTF8"));

do {
// Parse whatever was in the input buffer
inputBuffer.delete (0, inputBuffer.length ());
inputBuffer.append (inReader.readLine ());
System.out.println ("Input message was: " + inputBuffer);
this.server.messageFromClient (new Message (this.sessionId, inputBuffer.toString ()));
} while (!"QUIT".equals (inputBuffer.toString ()));

// Disconnect message
this.sendMessageToClient ("Goodbye, client " + this.getId ());
} catch (IOException | InterruptedException e) {
Logger.getLogger (Session.class.getName ()).log (Level.SEVERE, null, e);
} finally {
this.terminate ();
this.server.deleteSession (this.getId ());
}
}

/**
*
* @param msg
* @return
* @throws IOException
*/
public Session waitForServer (Message msg) throws IOException {
// Generate a response for the input
String output = this.buildResponse (msg.getPayload ()).toString ();
System.out.println ("Output message will be: " + output);

// Output to client
this.sendMessageToClient (output);

return this;
}

/**
*
* @param request
* @return
*/
private StringBuffer buildResponse (CharSequence request) {
StringBuffer ob = this.outputBuffer;
ob.delete (0, this.outputBuffer.length ());

ob.append ("Server repsonded at ")
.append (new java.util.Date ().toString () )
.append (" (You said '" )
.append (request)
.append ("')");

return this.outputBuffer;
}

/**
* Send the given message to the client
*
* @param message
* @throws IOException
*/
private void sendMessageToClient (CharSequence message) throws IOException {
// Output to client
OutputStreamWriter osw = this.getStreamWriter ();
osw.write ((String) message);
osw.write ("\r\n");
osw.flush ();
}

/**
* Get an output stream writer, initialize it if it's not active
*
* @return A configured OutputStreamWriter object
* @throws IOException
*/
private OutputStreamWriter getStreamWriter () throws IOException {
if (null == this.streamWriter) {
BufferedOutputStream os = new BufferedOutputStream (this.clientSocket.getOutputStream ());
this.streamWriter = new OutputStreamWriter (os, "UTF8");
}

return this.streamWriter;
}

/**
* Terminate the client connection
*/
private void terminate () {
try {
this.streamWriter = null;
this.clientSocket.close ();
} catch (IOException e) {
Logger.getLogger (Session.class.getName ()).log (Level.SEVERE, null, e);
}
}

/**
* Get this Session's ID
*
* @return The ID of this session
*/
public int getId () {
return this.sessionId;
}

/**
* Session constructor
*
* @param owner The Server object that owns this session
* @param sessionId The unique ID this session will be given
* @throws IOException
*/
public Session (Server owner, int sessionId) throws IOException {
System.out.println ("Class " + this.getClass () + " created");

this.server = owner;
this.sessionId = sessionId;
this.clientSocket = this.server.accept ();

System.out.println ("Session ID is " + this.sessionId);
}
}

测试应用程序是相当基本的,它只是回显原始请求消息的修改版本。真正的应用程序将在收到消息并向服务器返回有意义的响应时进行工作。
public class TestApp implements Runnable {
private BlockingQueue <Message> inputMessages, outputMessages;

@Override
public void run () {
Message lastMessage;
StringBuilder returnMessage = new StringBuilder ();

while (true) {
try {
lastMessage = this.inputMessages.take ();

// Construct a response
returnMessage.delete (0, returnMessage.length ());
returnMessage.append ("Server repsonded at ")
.append (new java.util.Date ().toString () )
.append (" (You said '" )
.append (lastMessage.getPayload ())
.append ("')");

// Pretend we're doing some work that takes a while
Thread.sleep (1000);

this.outputMessages.put (new Message (lastMessage.getClientID (), lastMessage.toString ()));
} catch (InterruptedException ex) {
Logger.getLogger (TestApp.class.getName ()).log (Level.SEVERE, null, ex);
}
}
}

/**
* Initialize the application
*
* @param inputMessages Where input messages come from
* @param outputMessages Where output messages go to
*/
public TestApp (BlockingQueue<Message> inputMessages, BlockingQueue<Message> outputMessages) {
this.inputMessages = inputMessages;
this.outputMessages = outputMessages;
System.out.println (this.getClass () + " created");
}
}

Message类非常简单,仅包含一个原始客户端ID和一个有效负载字符串,因此我省略了它。

最后,主类看起来像这样。
public class Runner {
/**
*
* @param args The first argument is the port to listen on.
* @throws Exception
*/
public static void main (String[] args) throws Exception {
BlockingQueue<Message> clientBuffer = new LinkedBlockingQueue ();
BlockingQueue<Message> appBuffer = new LinkedBlockingQueue ();
TestApp appInstance = new TestApp (clientBuffer, appBuffer);
Server serverInstance = new Server (Integer.parseInt (args [0]), clientBuffer, appBuffer);
Thread appThread = new Thread (appInstance);
Thread serverThread = new Thread (serverInstance);

appThread.setName("Application");
serverThread.setName ("Server");

appThread.start ();
serverThread.start ();

appThread.join ();
serverThread.join ();

System.exit (0);
}
}

尽管实际的应用程序将更加复杂,但TestApp却说明了基本的使用模式。它阻塞输入队列,直到那里有东西,对其进行处理,然后将结果压入其输出队列。

session 类管理特定客户端与服务器之间的实时连接。它从客户端获取输入并将其转换为Message对象,并从Server获取Message对象并将其转换为输出以发送给客户端。

服务器监听新的传入连接,并为其具有的每个传入连接设置一个Session对象。当 session 将消息传递给它时,它将消息放入其上游队列中,以供应用程序处理。

我遇到的困难是使返回消息从TestApp返回到各个客户端。当来自客户端的消息进入时, session 会生成一条消息并将其发送到服务器,然后服务器将其放入其上游队列,该队列也是TestApp的输入队列。作为响应,TestApp生成响应消息并将其放入输出队列,该队列也是服务器的下游队列。

这意味着 session 需要等待两个不相关的事件。他们应该封锁直到
  • 输入来自客户端(客户端套接字上的BufferedReader具有要处理的输入),
  • 或服务器发送一条消息给它(服务器在 session 上调用WaitForServer()方法)
    至于服务器本身,它也必须等待两个不相关的事件。
  • session 调用messageFromClient()并将消息传递给TestApp,
  • 或TestApp将消息推送到输出/下游队列中,以传递到 session 中。

  • 从表面上看,这看起来像是一项简单的任务,事实证明,这比我最初想象的要困难得多。我希望我能忽略一些显而易见的事情,因为我对并发编程还是很陌生,但是如果您能指出我要去哪里的地方,我将不胜感激。

    最佳答案

    因为您的实现使用的是在客户端 session 服务器之间传递数据的方法,所以实际上您已经解决了紧迫的问题。但是,这可能不是您的意图。这是正在发生的事情:

    session 的run方法正在其自己的线程中运行,并在套接字上阻塞。当服务器调用waitForServer时,此方法立即在服务器线程中执行-在Java中,如果线程调用了某个方法,则该方法在该线程中执行,因此在这种情况下, session 无需取消阻止。为了创建您要解决的问题,您需要删除waitForServer方法并将其替换为BlockingQueue messagesFromServer队列-然后,服务器会将消息放入此队列中,而Session将需要在其上进行阻塞,从而导致Session需要阻塞两个不同的对象(套接字和队列)。

    假设您切换到需要在两个对象上阻塞Session的实现,我认为您可以通过在注释中描述的两种方法的混合来解决此问题:

    每个 session 的套接字都需要一个线程来阻塞它-我看不到任何解决方法,除非您愿意将其替换为一个固定线程池(例如4个线程),该线程池会轮询该套接字并 hibernate 一些如果没有什么可读取的,则为十毫秒。

    您可以使用一个队列和一个阻塞该线程的线程来管理所有服务器-> session 流量-服务器在其有效负载中包含 session “地址”,以便阻塞在其上的线程知道如何处理该消息。如果您发现在进行大量 session 时无法扩展,则可以随时增加线程/队列计数,例如在32个 session 中,您可以有4个线程/队列,每个线程/队列8个 session 。

    关于java - Java在多个阻止程序上进行阻止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16234577/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com