gpt4 book ai didi

c# - 确保调用 Socket.XXXAsync 的线程保持事件状态以完成 IO 请求(IO 完成端口,C#)

转载 作者:行者123 更新时间:2023-12-03 12:06:40 27 4
gpt4 key购买 nike

我正在实现一个小型库来使用 System.Net.Sockets.Socket更轻松。它应该处理任意数量的监听和接收 TCP 套接字,并且实现速度快很重要。

我正在使用 XXXAsync方法和 CLR ThreadPool回调库用户的委托(delegate)(例如,每当成功发送消息或接收到某些数据时)。最好该库不会自行启动任何线程。

图书馆的用户可以访问我的Sockets的界面。用于发送消息或开始接收消息的包装器(在许多其他方法和重载中):

public interface IClientSocket {
// will eventually call Socket.SendAsync
IMessageHandle SendAsync(byte[] buffer, int offset, int length);

// will eventually call Socket.RecieveAsync
void StartReceiveAsync();
}
XXXAsync方法使用 IO 完成端口。因此,调用这些方法的线程必须保持事件状态,直到操作完成,否则操作将失败并返回 SocketError.OperationAborted。 (我认为是这样,还是不是?)。
对图书馆的用户施加这样的限制是丑陋的并且容易出错。

这里最好的选择是什么?
  • 调用ThreadPool.QueueUserWorkItem使用委托(delegate)调用 XXXAsync方法?那安全吗?我在某处读到,ThreadPool 不会停止具有任何 IOCP 的空闲线程。那会很好,因为它解决了上面的问题。
    但是对于许多 TCP 连接,它也可能很糟糕。
    在这种情况下,很可能每个 ThreadPool 线程都调用了未决的 ReceiveAsync 之一。来电。因此,即使当前工作负载很低并且许多线程处于空闲状态(并且浪费内存),线程池也永远不会缩小。
  • 启动一个始终处于事件状态并调用 XXXAsync 的专用线程方法。例如,当一个库用户想要发送数据时,它将一个委托(delegate)放入同步队列,线程弹出它并调用 SendAsync方法。
    我不太喜欢这个解决方案,因为它浪费了一个线程,并且在多核机器上,发送只能由一个线程执行。

  • 此外,这两种解决方案都不是最好的,因为它们将调用异步方法的工作传递给另一个线程。可以避免吗?

    你怎么看? (谢谢!!)

    托马斯

    编辑1:

    我可以重现 SocketError.OperationAborted以下测试程序有问题(我认为它是正确的)。
    编译、启动并 telnet 到端口 127.0.0.1:10000。发送“t”或“T”并等待 > 3 秒。发送“T”时, ReceiveAsync调用在 ThreadPool 中完成(工作),使用“t”启动一个新线程,该线程在 3 秒后终止(失败)。
    using System;
    using System.Net;
    using System.Net.Sockets;
    using System.Text;
    using System.Threading;
    using System.Collections.Generic;

    namespace Test {
    class Program {
    static List<Socket> _referenceToSockets = new List<Socket>();
    static void Main(string[] args) {
    Thread.CurrentThread.Name = "Main Thread";

    // Create a listening socket on Port 10000.
    Socket ServerSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
    _referenceToSockets.Add(ServerSocket);
    var endPoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 10000);
    ServerSocket.Bind(endPoint);
    ServerSocket.Listen(50);

    // Start listening.
    var saeaAccept = new SocketAsyncEventArgs();
    saeaAccept.Completed += OnCompleted;
    ServerSocket.AcceptAsync(saeaAccept);

    Console.WriteLine(String.Format("Listening on {0}.", endPoint));
    Console.ReadLine();
    }

    private static void OnCompleted(object obj, SocketAsyncEventArgs evt) {
    var socket = (Socket)obj;
    Console.WriteLine(String.Format("Async operation completed: {0}; Error: {1}; Callback-Thread: \"{2}\" ({3} threadpool)", evt.LastOperation, evt.SocketError, Thread.CurrentThread.Name, Thread.CurrentThread.IsThreadPoolThread?"is":"no"));
    switch (evt.LastOperation) {
    case SocketAsyncOperation.Accept:
    // Client connected. Listen for more.
    Socket clientSocket = evt.AcceptSocket;
    _referenceToSockets.Add(clientSocket);
    evt.AcceptSocket = null;
    socket.AcceptAsync(evt);

    // Start receiving data.
    var saeaReceive = new SocketAsyncEventArgs();
    saeaReceive.Completed += OnCompleted;
    saeaReceive.SetBuffer(new byte[1024], 0, 1024);
    clientSocket.ReceiveAsync(saeaReceive);
    break;
    case SocketAsyncOperation.Disconnect:
    socket.Close();
    evt.Dispose();
    break;
    case SocketAsyncOperation.Receive:
    if (evt.SocketError != SocketError.Success) {
    socket.DisconnectAsync(evt);
    return;
    }
    var asText = Encoding.ASCII.GetString(evt.Buffer, evt.Offset, evt.BytesTransferred);
    Console.WriteLine(String.Format("Received: {0} bytes: \"{1}\"", evt.BytesTransferred, asText));
    if (evt.BytesTransferred == 0) {
    socket.Close();
    evt.Dispose();
    }
    if (asText.ToUpper().StartsWith("T")) {
    Action<object> action = (object o) => {
    socket.ReceiveAsync(evt);
    Console.WriteLine(String.Format("Called ReceiveAsync {0}...", o));
    Thread.Sleep(3000);
    Console.WriteLine("End of Action...");
    };
    if (asText.StartsWith("T")) {
    ThreadPool.QueueUserWorkItem(o=>action(o), "in ThreadPool");
    } else {
    new Thread(o=>action(o)).Start("in a new Thread");
    }
    } else {
    socket.ReceiveAsync(evt);
    }
    break;
    }
    }
    }
    }

    编辑#3

    以下是我打算使用的解决方案:

    我让库用户的线程调用 XXXAsync ( SendAsync 除外)直接操作。在大多数情况下,调用会成功(因为调用线程很少会终止)。
    如果操作失败并返回 SocketError.OperationAborted ,库只是使用来自异步回调的当前线程再次调用操作。这个是 ThreadPool线程并且它很有可能成功(将设置一个附加标志,如果 SocketError.OperationAborted 的原因是由于某些其他错误引起的,则最多使用此解决方法一次)。
    这应该可以工作,因为套接字本身仍然可以,只是之前的操作失败了。

    对于 SendAsync ,此解决方法不起作用,因为它可能会弄乱消息的顺序。在这种情况下,我会将消息排队到 FIFO 列表中。我将使用 ThreadPool将它们出列并通过 SendAsync 发送它们.

    最佳答案

    这里有几个危险信号。如果您在保持线程活跃时遇到问题,那么您的线程没有做任何有用的事情。在这种情况下,使用 Async 方法没有意义。如果速度是您唯一关心的问题,那么您不应该使用 Async 方法。他们要求 Socket 抓取一个 tp 线程来进行回调。这是很小的开销,但如果常规线程进行阻塞调用,则没有。

    当您需要在处理许多同时连接时使您的应用程序很好地扩展时,您应该只考虑 Async 方法。这会产生非常突发的 cpu 负载,非常适合 tp 线程。

    关于c# - 确保调用 Socket.XXXAsync 的线程保持事件状态以完成 IO 请求(IO 完成端口,C#),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/5465807/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com