gpt4 book ai didi

java - 谁能解释 Windows (7 Pro N) 和 Linux ( Ubuntu 12-04) 之间的套接字吞吐量差异

转载 作者:可可西里 更新时间:2023-11-01 10:49:15 24 4
gpt4 key购买 nike

在评估 3rd 方软件(使用 NIO 的 Java 框架)时,我们发现该框架在 Windows 上的吞吐量约为 Linux 上的 50%。

假设有一些影响 Windows 的 JVM 或操作系统设置,我们将开始测试跨两个平台的简单计算(Fibonacci、heapsort、strcat 等)和对象构建。在所有情况下,操作系统都差不多。然而,当我们使用简单的 ServerSocket 和 Client Socket (java.net jdk 1.7u5) 执行吞吐量测试时,我们注意到 Linux 吞吐量可以高达 Windows 的 10 倍,尤其是对于小消息(100 字节)。

我们的直接假设是操作系统套接字发送/接收缓冲区大小不同,并且它们在吞吐量中发挥着重要作用。或者这与在一个操作系统上启用 TcpNoDelay 而不是另一个有关。

操作系统套接字选项默认值为 ...

Linux: rcvBuf: 43690, sndBuf:86700, TcpNoDelay: No (Nagle On)
Windows7: rcvBuf: 8192, sndBuf:8192, TcpNoDelay: No (Nagle On)

结果如下...

Throughput Test Results

这些结果引发了许多问题,尽管以下是最具挑战性的(我根本无法解释)和最直接的问题,因为当前的目标平台是 Windows。
  • 为什么不管缓冲区大小如何,100 字节消息的 Windows 时间(使用 Nagle On)几乎比 Linux 慢 10 倍
  • 为什么 100 字节消息的所有 Windows 时间(无论 Nagle 设置或缓冲区大小如何)大约为 7000 毫秒
  • 为什么所有的 Windows 时间都是 100 字节的消息(无论 Nagle 设置或缓冲区大小如何)都比它们的 1000 字节等值慢。我怀疑这与有效使用缓冲和/或 Nagling On/Off 有关,尽管我并不完全理解。

  • 我们在 Windows 上执行了额外的测试,我们从 50 字节开始消息大小,并在每 100 万条消息后增加 100 字节。结果很奇怪(但可以预测);在 50 到 400-450 字节之间,时间是 ~7000 毫秒,从 450-1000 字节,它们的范围是 ~5500 毫秒 - 7000 毫秒。一遍又一遍地重复这个测试显示了相同的模式。这就像需要将小消息填充到特定大小(可能是 MTU 或其他约束)。我们再次无法解释这种行为。

    任何解释或方向将不胜感激。请不要将其简化为 Windows v Linux 主题,改天再说吧。

    服务器代码
    public class SimpleMultiClientServer implements Runnable {

    private static ExecutorService threadPool = null;
    public static void main(String[] args) throws IOException {

    //ExecutorService threadPool;
    AppConfiguration configurations = null;
    if ((configurations = AppConfiguration.readAppConfiguration(args)) == null )
    System.exit(1);

    //create thread pool and execute
    threadPool = Executors.newCachedThreadPool();
    threadPool.execute( new SimpleMultiClientServer(configurations) );

    System.out.println("Hit any key to exit ....");
    System.in.read();

    //stop listening
    threadPool.shutdown();
    }

    //------------------------------------------------------------------------------

    private ServerSocket theServerSocket;
    private boolean listening = true;
    private AppConfiguration theConfigurations;

    private SimpleMultiClientServer(AppConfiguration configurations) {

    this.theConfigurations = configurations;
    }

    @Override
    public void run() {

    try {
    theServerSocket = new ServerSocket(theConfigurations.port);
    System.out.println(String.format("ServerSocket listening on port [%d]", theConfigurations.port));

    if ( theConfigurations.printSocketDetails )
    printSocketDetials();
    setServerSocketConditioning();

    } catch (IOException e) {
    System.err.println(String.format("Could not listen on port: %d.", theConfigurations.port));
    System.exit(-1);
    }

    //TODO interrupt the listending thread in order to shutdown
    while (listening) {
    try {
    threadPool.execute( new SimpleMultiClientServerThread( theServerSocket.accept() ) );
    System.out.println("Accept new client on socket ....");
    } catch (IOException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }
    }

    try {
    theServerSocket.close();
    } catch (IOException e) {
    e.printStackTrace();
    }
    }

    private static class SimpleMultiClientServerThread implements Runnable {

    private Socket theSocket = null;
    @SuppressWarnings("unused")
    private long messageCount = 0;

    public SimpleMultiClientServerThread(Socket socket) {

    this.theSocket = socket;
    }

    public void run() {

    PrintWriter out = null;
    BufferedReader in = null;

    try {
    out = new PrintWriter(theSocket.getOutputStream(), true);
    in = new BufferedReader(new InputStreamReader( theSocket.getInputStream() ));

    @SuppressWarnings("unused")
    String inputLine;

    while ((inputLine = in.readLine()) != null) {

    messageCount++;

    //TODO client should send shutdown message or end of stream marker on closing
    }
    } catch (IOException e) {
    e.printStackTrace();
    } finally {

    try {

    if ( null != out )
    out.close();
    if ( null != in )
    in.close();
    if ( null != theSocket )
    theSocket.close();
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    }
    }

    private void printSocketDetials() {

    //write to file
    FileOutputStream fos = null;
    PrintWriter pw = null;
    try {

    fos = new FileOutputStream("socketserver-details.log", true);
    pw = new PrintWriter(fos);
    pw.println("Socket (Server) details ....");

    try {

    if ( null != this.theServerSocket ) {
    pw.println(String.format("isBound [%b]", this.theServerSocket.isBound()));
    //SO_RCVBUF
    pw.println(String.format("getReceiveBufferSize [%d]", this.theServerSocket.getReceiveBufferSize()));
    //SO_REUSEADDR
    pw.println(String.format("getReuseAddress [%b]", this.theServerSocket.getReuseAddress()));
    //SO_TIMEOUT
    pw.println(String.format("getSoTimeout [%d]", this.theServerSocket.getSoTimeout()));
    pw.println("----------------------------------------------------------");
    }
    } catch (Exception e) {
    e.printStackTrace();
    }

    System.out.println();
    } catch (FileNotFoundException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    } finally {

    if ( null != pw ) {
    pw.flush();
    pw.close();

    if ( null != fos )
    pw.close();
    }
    }
    }

    private void setServerSocketConditioning() {

    try {

    if ( theConfigurations.rcvBufSpecified )
    theServerSocket.setReceiveBufferSize(theConfigurations.rcvBuf);

    if ( ( theConfigurations.rcvBufSpecified ) &&
    theConfigurations.printSocketDetails ) {
    printSocketDetials();
    }

    } catch (Exception e) {
    e.printStackTrace();
    }

    }

    private static class AppConfiguration {

    public int port;

    //socket conditioning
    public boolean printSocketDetails;

    public int rcvBuf;
    public boolean rcvBufSpecified;

    public static AppConfiguration readAppConfiguration(String[] args) {

    AppConfiguration configurations = new AppConfiguration();

    try {

    configurations.port = Integer.parseInt(args[0]);

    if ( args.length > 1 ) {

    String[] socketConditioningDetails = args[1].split("\\|");

    configurations.printSocketDetails = Integer.parseInt(socketConditioningDetails[0]) == 1 ? true : false;

    if ( socketConditioningDetails.length > 1 ) {
    configurations.rcvBuf = Integer.parseInt(socketConditioningDetails[1]);
    configurations.rcvBufSpecified = true;
    }
    }
    } catch (Exception e) {
    System.out.println(String.format("Exception caught while parsin app configurations, %s", e.getMessage()));
    return null;
    }

    return configurations;
    }

    }
    }

    客户代码...
    public class SimpleSocketClient implements Runnable {

    public static void main(String[] args) throws IOException {

    AppConfiguration configurations = null;
    if ((configurations = AppConfiguration.readAppConfiguration(args)) == null )
    System.exit(1);

    if ( configurations.iterateThruByteRange ) {

    //set starting message size
    configurations.msgSize = configurations.startingMsgSize;
    for (int i = 0; i < configurations.numIterations; i++) {

    SimpleSocketClient client = new SimpleSocketClient(configurations);
    client.run();

    if ( configurations.msgSizeIncrementSpecified )
    configurations.msgSize += configurations.msgSizeIncrement;
    else //double message size for next iteration
    configurations.msgSize *= 2;

    if ( configurations.reduceMsgSize )
    configurations.msgCount = Math.max(1000, configurations.msgCount / 2);

    try {
    Thread.sleep(2500);
    } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }
    }
    } else {
    SimpleSocketClient client = new SimpleSocketClient(configurations);
    client.run();
    }
    }

    //------------------------------------------------------------------------------

    private AppConfiguration theConfigurations;
    private Socket theSocket;
    private boolean isRunning = true;
    private OutputStream obStream = null;
    private PrintWriter obWriter = null;
    private long[] sendTimeRecorder = null;

    private SimpleSocketClient(AppConfiguration configurations) {

    this.theConfigurations = configurations;

    //prepare send time recorder
    //sendTimeRecorder = new long[theConfigurations.msgCount];
    //Arrays.fill(sendTimeRecorder, 0);
    }

    @Override
    public void run() {

    try {
    if ( !connectToServer() )
    return;

    if ( theConfigurations.printSocketDetails )
    printSocketDetials();

    setSocketConditioning();

    System.out.println(String.format("[%s] About to send msg len [%d] [%d] times ",
    new Date().toString(),
    theConfigurations.msgSize,
    theConfigurations.msgCount));

    if ( !theConfigurations.iterateThruByteRange ||
    ( theConfigurations.iterateThruByteRange && !theConfigurations.alreadyWaited) ) {

    waitToContinue();
    theConfigurations.alreadyWaited = true;
    }

    long startTimeInMillis = System.currentTimeMillis();

    sendMessages();

    System.out.println(String.format("All messages sent [%d] in timeMS [%d]",
    theConfigurations.msgCount,
    System.currentTimeMillis() - startTimeInMillis
    ));

    //printSendTimes();
    } catch (Exception e) {
    e.printStackTrace();
    } finally {
    try {

    if ( null != obWriter )
    obWriter.close();

    if ( null != theSocket )
    theSocket.close();
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    }

    private void sendMessages() {

    //prepare fixed length message
    byte[] fixedLengthMessage = new byte[theConfigurations.msgSize+1];
    Arrays.fill(fixedLengthMessage, (byte)0x41);
    fixedLengthMessage[fixedLengthMessage.length-1] = (byte)0x0D; //carriage return

    //long startTimeInNanos = 0;
    int messageCount = 0;

    while ( isRunning && messageCount < theConfigurations.msgCount ) {

    try {
    //startTimeInNanos = System.nanoTime();
    obStream.write(fixedLengthMessage);
    //sendTimeRecorder[messageCount] = System.nanoTime() - startTimeInNanos;
    } catch (IOException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }

    messageCount++;
    }
    }

    private boolean waitToContinue() {

    boolean exit = false;

    System.out.println("Hit any key to continue ...");
    try {
    int keyValue = System.in.read();

    if ( keyValue == 'q' )
    exit = true;
    } catch (IOException e1) {
    // TODO Auto-generated catch block
    e1.printStackTrace();
    }

    return exit;
    }

    private boolean connectToServer() {

    boolean connected = false;

    try {
    theSocket = new Socket(theConfigurations.host, theConfigurations.port);
    obStream = theSocket.getOutputStream();
    obWriter = new PrintWriter(theSocket.getOutputStream(), true);
    connected = true;

    } catch (UnknownHostException e) {
    System.err.println(String.format("Don't know about host: %s.", theConfigurations.host));
    System.exit(1);
    } catch (IOException e) {
    System.err.println(String.format("Couldn't get I/O for the connection to: %s.", theConfigurations.host));
    System.exit(1);
    }

    return connected;
    }

    private void printSocketDetials() {

    //write to file
    FileOutputStream fos = null;
    PrintWriter pw = null;
    try {

    fos = new FileOutputStream("socket-details.log", true);
    pw = new PrintWriter(fos);
    pw.println("Socket (Client) details ....");

    try {

    if ( null != this.theSocket ) {

    pw.println(String.format("IsConnected [%b]", this.theSocket.isConnected()));
    //SO_KEEPALIVE
    pw.println(String.format("getKeepAlive [%b]", this.theSocket.getKeepAlive()));
    //OOBINLINE
    pw.println(String.format("getOOBInline [%b]", this.theSocket.getOOBInline()));
    //SO_RCVBUF
    pw.println(String.format("getReceiveBufferSize [%d]", this.theSocket.getReceiveBufferSize()));
    //SO_REUSEADDR
    pw.println(String.format("getReuseAddress [%b]", this.theSocket.getReuseAddress()));
    //SO_SNDBUF
    pw.println(String.format("getSendBufferSize [%d]", this.theSocket.getSendBufferSize()));
    //SO_LINGER
    pw.println(String.format("getSoLinger [%d]", this.theSocket.getSoLinger()));
    //SO_TIMEOUT
    pw.println(String.format("getSoTimeout [%d]", this.theSocket.getSoTimeout()));
    //TCP_NODELAY
    pw.println(String.format("getTcpNoDelay [%b]", this.theSocket.getTcpNoDelay()));
    pw.println("----------------------------------------------------------");
    }
    } catch (Exception e) {
    e.printStackTrace();
    }

    System.out.println();
    } catch (FileNotFoundException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    } finally {

    if ( null != pw ) {
    pw.flush();
    pw.close();

    if ( null != fos )
    pw.close();
    }
    }
    }

    private void setSocketConditioning() {

    try {

    if ( theConfigurations.rcvBufSpecified )
    theSocket.setReceiveBufferSize(theConfigurations.rcvBuf);
    if ( theConfigurations.sndBufSpecified )
    theSocket.setSendBufferSize(theConfigurations.sndBuf);
    if ( theConfigurations.naglingEnabledSpecified )
    theSocket.setTcpNoDelay(!theConfigurations.naglingEnabled);

    if ( ( theConfigurations.rcvBufSpecified ||
    theConfigurations.sndBufSpecified ||
    theConfigurations.naglingEnabledSpecified ) &&
    theConfigurations.printSocketDetails ) {

    printSocketDetials();
    }
    } catch (Exception e) {
    e.printStackTrace();
    }

    }

    private void printSendTimes() {

    //write to file
    FileOutputStream fos = null;
    PrintWriter pw = null;
    try {

    String fileName = String.format("sendTimes-%d-%d-%s.log",
    theConfigurations.msgSize,
    theConfigurations.msgCount,
    theConfigurations.naglingEnabledSpecified && !theConfigurations.naglingEnabled ? "WithNoNagle" : "WithNagle");

    fos = new FileOutputStream(fileName, false);
    pw = new PrintWriter(fos);

    pw.println(Arrays.toString(this.sendTimeRecorder));
    } catch (FileNotFoundException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    } finally {

    if ( null != pw ) {
    pw.flush();
    pw.close();

    if ( null != fos )
    pw.close();
    }
    }

    }

    private static class AppConfiguration {

    public String host;
    public int port;
    public int msgSize;
    public int msgCount;
    @SuppressWarnings("unused")
    public int msgRatePerSecond;

    //socket conditioning
    public boolean printSocketDetails;

    public int rcvBuf;
    public boolean rcvBufSpecified;

    public int sndBuf;
    public boolean sndBufSpecified;

    public boolean naglingEnabled;
    public boolean naglingEnabledSpecified;

    //testing byte ranges
    public boolean iterateThruByteRange;
    public int startingMsgSize;
    public int numIterations;
    public boolean reduceMsgSize;
    public int msgSizeIncrement;
    public boolean msgSizeIncrementSpecified;
    public boolean alreadyWaited = false;

    public static AppConfiguration readAppConfiguration(String[] args) {

    AppConfiguration configurations = new AppConfiguration();

    try {

    configurations.host = args[0];
    configurations.port = Integer.parseInt(args[1]);
    configurations.msgSize = Integer.parseInt(args[2]);

    String[] msgCountDetails = args[3].split(":");
    configurations.msgCount = Integer.parseInt(msgCountDetails[0]);
    if ( msgCountDetails.length == 2 )
    configurations.msgRatePerSecond = Integer.parseInt(msgCountDetails[1]);

    if ( args.length > 4 ) {

    String[] socketConditioningDetails = args[4].split("\\|");
    configurations.printSocketDetails = Integer.parseInt(socketConditioningDetails[0]) == 1 ? true : false;

    if ( socketConditioningDetails.length > 1 ) {
    configurations.rcvBuf = Integer.parseInt(socketConditioningDetails[1]);
    configurations.rcvBufSpecified = true;
    }
    if ( socketConditioningDetails.length > 2 ) {
    configurations.sndBuf = Integer.parseInt(socketConditioningDetails[2]);
    configurations.sndBufSpecified = true;
    }
    if ( socketConditioningDetails.length > 3 ) {
    configurations.naglingEnabled = Integer.parseInt(socketConditioningDetails[3]) == 1 ? true : false;
    configurations.naglingEnabledSpecified = true;
    }
    }

    if ( args.length > 5 ) {

    String[] byteRangeSettings = args[5].split("\\|");
    configurations.iterateThruByteRange = Integer.parseInt(byteRangeSettings[0]) == 1 ? true : false;

    if ( byteRangeSettings.length > 1 )
    configurations.startingMsgSize = Integer.parseInt(byteRangeSettings[1]);
    if ( byteRangeSettings.length > 2 )
    configurations.numIterations = Integer.parseInt(byteRangeSettings[2]);
    if ( byteRangeSettings.length > 3 )
    configurations.reduceMsgSize = Integer.parseInt(byteRangeSettings[3]) == 1 ? true : false;
    if ( byteRangeSettings.length > 4 ) {

    configurations.msgSizeIncrement = Integer.parseInt(byteRangeSettings[4]);
    configurations.msgSizeIncrementSpecified = true;
    }

    }

    } catch (Exception e) {
    System.out.println(String.format("Exception caught while parsin app configurations, %s", e.getMessage()));
    return null;
    }

    return configurations;
    }


    }

    }

    命令行

    服务器:
    java –cp . SimpleMultiClientServer 9090 [“printSocketConditioning|rbuffer size”]

    可选 printSocketConditioning : 1/0,打印到 'socketserver-details.log' rbuffer size : 以字节为单位的接收缓冲区

    例如 java –cp . SimpleMultiClientServer 9090
    客户:

    基本:
    java –cp . SimpleSocketClient localhost 9090 msgSize numMessages
    msgSize : 消息大小(以字节为单位) numMessages : 要发送的消息数

    例如 java –cp . SimpleSocketClient localhost 9090 100 1000000
    设置 socket 调节:
    java –cp . SimpleSocketClient localhost 9090 <msgSize> <numMessages> “printSocketConditioning|rbuffer|sbuffer|nagle on”]
    printSocketConditioning: 1/0 rbuffer : 接收缓冲区大小(以字节为单位)
    sbuffer : 以字节为单位发送缓冲区大小
    nagle on: 1/0 (相当于 TcpNoDelay 关/开)

    例如 java –cp . SimpleSocketClient localhost 9090 100 1000000 “1|8192|8192|1”
    通过字节范围迭代的套接字调节:
    java –cp . SimpleSocketClient localhost 9090 msgSize numMessages [“printSocketConditioning|rbuffer|sbuffer|nagle on” 
    [“iterateByteRange|startSize|numIterations|reduceMsgCount|incMsgSizeBy”]
    ]
    iterateByteRange: 1/0 ,从消息大小开始执行一系列测试 startSize并通过加倍(默认)或 incMsgSizeBy 递增, 重复 numIterations startSize : 以字节为单位的消息起始大小(例如 100)
    numIterations :例如1000000
    reduceMsgCount: 1/0 , 每次迭代减半或保持不变
    incMsgSizeBy :在每次迭代时按字节增加消息大小(例如 100)

    *例如。 java –cp . SimpleSocketClient localhost 9090 100 1000000 “1|8192|8192|1” "1|100|20|0|100"*
    请注意,上面发布的所有结果都是在相同的硬件上执行的,客户端和服务器都在同一台机器上。

    最佳答案

    Microsoft 在 Windows(客户端)操作系统中专门针对 TCP 连接实现了限制,以防止将其用作服务器 (reference)。在 Windows Server 2008 R2 上再次运行您的测试,看看是否有任何不同。

    关于java - 谁能解释 Windows (7 Pro N) 和 Linux ( Ubuntu 12-04) 之间的套接字吞吐量差异,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11565777/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com