gpt4 book ai didi

java - 使用单个 TCP/IP 套接字从多个并发线程发送数据的问题

转载 作者:可可西里 更新时间:2023-11-01 02:43:50 27 4
gpt4 key购买 nike

在浏览了一些关于我的问题的其他线程后,我想我明白我需要重新设计我的应用程序。但只是为了澄清:我在客户端和服务器之间有一个 TCP/IP 连接。在客户端有多个线程同时运行。这些线程中的一个或多个随机使用 TCP/IP 连接与服务器通信。我发现了,e。 G。当长时间运行的文件传输处于 Activity 状态时,同时使用与另一个线程的连接可能会导致错误。尽管我在每条消息之前都添加了一个特定的 header ,其中包括数据长度,但在我看来,IP 堆栈有时会向我的程序传送多个消息的混合,这意味着尽管一条消息具有net 尚未完全传送,另一条消息的一部分传送到我的读取方法。这是符合预期的 TCP/IP 行为的正确观察吗?提前致谢 - 马里奥

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

对于任何感兴趣的人:以下是我的测试程序的源代码。您可以使用不同的 BUFFER_SIZE 值和 THREADS 的数量,这些值用于使用同一套接字通过并发 TCP/IP 发送来轰炸服务器套接字。我省略了一些错误处理并删除了更复杂的终止,包括关闭套接字。使用大于 64KB 的 BUFFER_SIZE 进行测试总是会导致我的机器出错。

import java.io.*;
import java.net.*;
import java.nio.ByteBuffer;

public class TCPTest
{
private final static String INPUT_FILE = "c:/temp/tcptest.in";
private final static int BUFFER_SIZE = 64 * 1024 - 8; //65536;
private final static int MESSAGE_SIZE = 512 * 64 * 1024;
private final static int THREADS = 3;
private final static int SIZE_OF_INT = 4;
private final static int LENGTH_SIZE = SIZE_OF_INT;
private final static int ID_SIZE = SIZE_OF_INT;
private final static int HEADER_SIZE = LENGTH_SIZE + ID_SIZE;
private final static String NEW_LINE = System.getProperty("line.separator");

private ServerSocket m_serverSocket = null;
private Socket m_clientSocket = null;
private int m_iThreadCounter;

public static void main(String[] args)
{
new TCPTest();
} // main

public TCPTest()
{
final String id = "ReaderThread[*]";
// start a new thread creating a server socket waiting for connections
new Thread(new Runnable()
{
public void run()
{
try
{
// create server socket and accept client requests
m_serverSocket = new ServerSocket(12345);
m_clientSocket = m_serverSocket.accept();
// client request => prepare and read data
long startTime = System.currentTimeMillis();
byte[] buffer = new byte[BUFFER_SIZE];
ByteBuffer header = ByteBuffer.allocate(HEADER_SIZE);
int iTotalBytesRead = 0;
boolean fTerminate = false;
int iBytesRead;
// get hold of socket's input stream
InputStream clientInputStream = m_clientSocket.getInputStream();
// loop
while (false == fTerminate)
{
// loop to read next header
for (int i = 0; i < HEADER_SIZE; i++)
clientInputStream.read(header.array(), i, 1);
header.rewind();
// get information of interest
int iLength = header.getInt();
int iId = header.getInt();
int iLengthSoFar = 0;
int iBytesLeft = iLength;
int iBytesToRead;
// any length given?
if ((0 < iLength) && (BUFFER_SIZE >= iLength))
{
// that's the case => read complete message
while (iLengthSoFar < iLength)
{
// calculate number of bytes left
iBytesLeft = iLength - iLengthSoFar;
// calculate maximum number of bytes to read
if (iBytesLeft > BUFFER_SIZE)
iBytesToRead = BUFFER_SIZE;
else
iBytesToRead = iBytesLeft;
// read next portion of bytes
if ((iBytesRead = clientInputStream.read(buffer, 0, iBytesToRead)) != -1)
{
// maintain statistics
iTotalBytesRead += iBytesRead;
iLengthSoFar += iBytesRead;
} // if
else
{
// finish => print message
System.out.println("==> "+id+": ERROR length=<-1> received " +
"for id=<"+iId+">");
fTerminate = true;
break;
} // else
} // while
} // if
else
{
System.out.println("==> "+id+": ERROR data length <= 0 for id=<"+iId+">");
dump(header, 0, HEADER_SIZE / SIZE_OF_INT, "Error header");
} // else
} // while
System.out.println("==> "+id+": "+ iTotalBytesRead + " bytes read in "
+ (System.currentTimeMillis() - startTime) + " ms.");
} // try
catch (IOException e)
{
e.printStackTrace();
} // catch
} // run
}).start();
// create the socket writer threads
try
{
// ensure server is brought up and request a connection
Thread.sleep(1000);
System.out.println("==> "+id+": just awoke");
Socket socket = new Socket("localhost", 12345);
OutputStream socketOutputStream = socket.getOutputStream();
System.out.println("==> "+id+": socket obtained");
// create some writer threads
for (int i = 0; i < THREADS; i++)
// create a new socket writer and start the thread
(new SocketWriter(socket,
(i+1),
BUFFER_SIZE,
new String("WriterThread["+(i+1)+"]"),
socketOutputStream)).start();
} // try
catch (Exception e)
{
e.printStackTrace();
} // catch
} // TCPTestEx

private final static void dump(ByteBuffer bb, int iOffset, int iInts, String header)
{
System.out.println(header);
bb.rewind();
for (int i = 0; i < iInts; i++)
System.out.print(" " + Integer.toHexString(bb.getInt()).toUpperCase());
System.out.print(NEW_LINE);
} // dump

private class SocketWriter extends Thread
{
Socket m_socket;
int m_iId;
int m_iBufferSize;
String m_id;
OutputStream m_os;

protected SocketWriter(Socket socket, int iId, int iBufferSize, String id, OutputStream os)
{
m_socket = socket;
m_iId = iId;
m_iBufferSize = iBufferSize;
m_id = id;
m_os = os;
// increment thread counter
synchronized (m_serverSocket)
{
m_iThreadCounter++;
} // synchronized
} // SocketWriter

public final void run()
{
try
{
long startTime = System.currentTimeMillis();
ByteBuffer buffer = ByteBuffer.allocate(m_iBufferSize + HEADER_SIZE);
int iTotalBytesRead = 0;
int iNextMessageSize = 512 * m_iBufferSize;
int iBytesRead;
// open input stream for file to read and send
FileInputStream fileInputStream = new FileInputStream(INPUT_FILE);
System.out.println("==> "+m_id+": file input stream obtained");
// loop to read complete file
while (-1 != (iBytesRead = fileInputStream.read(buffer.array(), HEADER_SIZE, m_iBufferSize)))
{
// add length and id to buffer and write over TCP
buffer.putInt(0, iBytesRead);
buffer.putInt(LENGTH_SIZE, m_iId);
m_os.write(buffer.array(), 0, HEADER_SIZE + iBytesRead);
// maintain statistics and print message if so desired
iTotalBytesRead += iBytesRead;
if (iNextMessageSize <= iTotalBytesRead)
{
System.out.println("==> "+m_id+": <"+iTotalBytesRead+"> bytes processed");
iNextMessageSize += MESSAGE_SIZE;
} // if
} // while
// close my file input stream
fileInputStream.close();
System.out.println("==> "+m_id+": file input stream closed");
System.out.println("==> "+m_id+": <"+ iTotalBytesRead + "> bytes written in "
+ (System.currentTimeMillis() - startTime) + " ms.");
// decrement thread counter
synchronized (m_serverSocket)
{
m_iThreadCounter--;
// last thread?
if (0 >= m_iThreadCounter)
// that's the case => terminate
System.exit(0);
} // synchronized
} // try
catch (Exception e)
{
e.printStackTrace();
} // catch
} // run
} // SocketWriter
} // TCPTest

最佳答案

嗯。 TCP 是一种面向字节的流协议(protocol)。这意味着应用程序接收到一个(未定界的)字节流。 “消息”的概念应由应用程序提供(或改用面向消息的协议(protocol))。

关于java - 使用单个 TCP/IP 套接字从多个并发线程发送数据的问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14939760/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com