gpt4 book ai didi

c# - 在 C# 中重构异步套接字编程

转载 作者:行者123 更新时间:2023-12-03 11:56:59 24 4
gpt4 key购买 nike

我有一堆我想通过 C# 套接字公开的异步方法。 MSDN 中的一般模式 documentaions具有以下形式:

 public static void StartListening()
{
...
IPEndPoint localEndPoint = new IPEndPoint(ipAddress, 11000);
...
listener.BeginAccept(new AsyncCallback(AcceptCallback), listener);
...
}

public static void AcceptCallback(IAsyncResult ar)
{
...
handler.BeginReceive( state.buffer, 0, StateObject.BufferSize, 0,
new AsyncCallback(ReadCallback), state);
}

public static void ReadCallback(IAsyncResult ar)
{
...
StateObject state = (StateObject) ar.AsyncState;
...
CalculateResult(state);
...
handler.BeginReceive(state.buffer, 0, StateObject.BufferSize, 0,
new AsyncCallback(ReadCallback), state);
...
}
因此,在不重复代码的情况下以简洁明了的形式编写所有这些内容一直是​​一个挑战。我正在沿着这些思路思考,但无法将这些点联系起来:
    public static void StartListeningMaster()
{
string ipAddress = "localhost";
IPHostEntry ipHost = Dns.GetHostEntry(ipAddress);
IPAddress address = ipHost.AddressList[0];

StartListening(50000, address, AcceptCallback1);
StartListening(50001, address, AcceptCallback2);
StartListening(50002, address, AcceptCallback3);
...
}

public static void StartListening(int port, IPAddress ipAddress,
Action<IAsyncResult> acceptCallback) {...}
public static void AcceptCallback1(IAsyncResult ar)
{
...
handler.BeginReceive(state.buffer, 0, StateObject.BufferSize,
0, new AsyncCallback1(ReadCallback1), state);
}
...
到目前为止,这工作正常。但是为了正确地重构它,我希望有一个 AcceptCallback 方法,它的参数是一个通用的 ReadCallback,它的参数是一个 CalculateResult 方法。这样我就不会有任何重复的代码。但是,如果我修改我的 AcceptCallback 方法以采用比 IAsyncResult 更多的参数(例如:
  public static void StartListening(int port, IPAddress ipAddress, Action<IAsyncResult, Action<IAsyncResult>> acceptCallback) {...}

public static void AcceptCallback(IAsyncResult ar, Action<IAsyncResult> readCallback) {}

我打破了 AsyncCallback 委托(delegate)契约(Contract)。
public delegate void AsyncCallback(IAsyncResult ar);
然后我研究扩展现有接口(interface)以允许该功能。我研究了扩展
public interface IAsyncResult
但这似乎也不是正确的方法。那么,我该如何编写这段代码,这样我就不会到处复制和粘贴几乎相同的代码?

最佳答案

所以我解决这个问题的方法是将基本组件移动到它们自己的抽象对象中。然后建立在这些对象之上。例如,服务器只需要接受/跟踪连接。所以我会创建一个看起来像这样的服务器对象:

namespace MultiServerExample.Base
{
public interface IAsyncServerBase
{
void StartListening();
bool IsListening { get; }
void StopListening();
void WriteDataToAllClients(byte[] data);
}

public abstract class AsyncServerBase<TClientBase> : IAsyncServerBase
where TClientBase : IAsyncClientBase, new()
{
// implement a TcpListener to gain access to Active property
private sealed class ActiveTcpListener : TcpListener
{
public ActiveTcpListener(IPAddress localaddr, int port)
: base(localaddr, port) { }
public bool IsActive => Active;
}

// our listener object
private ActiveTcpListener Listener { get; }
// our clients
private ConcurrentDictionary<string, TClientBase> Clients { get; }

// construct with a port
public AsyncServerBase(int port)
{
Clients = new ConcurrentDictionary<string, TClientBase>();
Listener = new ActiveTcpListener(IPAddress.Any, port);
}

// virtual methods for client action
public virtual void OnClientConnected(TClientBase client) { }
public virtual void OnClientDisconnected(TClientBase client, Exception ex) { }

// start the server
public void StartListening()
{
if(!IsListening)
{
Listener.Start();
Listener.BeginAcceptTcpClient(OnAcceptedTcpClient, this);
}
}

// check if the server is running
public bool IsListening =>
Listener.IsActive;

// stop the server
public void StopListening()
{
if (IsListening)
{
Listener.Stop();
Parallel.ForEach(Clients, x => x.Value.DetachClient(null));
Clients.Clear();
}
}

// async callback for when a client wants to connect
private static void OnAcceptedTcpClient(IAsyncResult res)
{
var me = (AsyncServerBase<TClientBase>)res.AsyncState;

if (!me.IsListening) { return; }

try
{
TcpClient client = null;
try
{
client = me.Listener.EndAcceptTcpClient(res);
}
catch(Exception ex)
{
System.Diagnostics.Debug.WriteLine($"Warning: unable to accept client:\n{ex}");
}

if(client != null)
{
// create a new client
var t = new TClientBase();
// set up error callbacks
t.Error += me.OnClientBaseError;
// notify client we have attached
t.AttachClient(client);
// track the client
me.Clients[t.Id] = t;
// notify we have a new connection
me.OnClientConnected(t);
}
}
finally
{
// if we are still listening, wait for another connection
if(me.IsListening)
{
me.Listener.BeginAcceptSocket(OnAcceptedTcpClient, me);
}
}
}

// Event callback from a client that an error has occurred
private void OnClientBaseError(object sender, AsyncClientBaseErrorEventArgs e)
{
var client = (TClientBase)sender;
client.Error -= OnClientBaseError;

OnClientDisconnected(client, e.Exception);

client.DetachClient(e.Exception);
Clients.TryRemove(client.Id, out _);
}

// utility method to write data to all clients connected
public void WriteDataToAllClients(byte[] data)
{
Parallel.ForEach(Clients, x => x.Value.WriteData(data));
}
}
}
至此,运行服务器的所有基础知识都已经讲完了。现在对于在服务器上运行的客户端:
namespace MultiServerExample.Base
{
public interface IAsyncClientBase
{
event EventHandler<AsyncClientBaseErrorEventArgs> Error;
void AttachClient(TcpClient client);
void WriteData(byte[] data);
void DetachClient(Exception ex);
string Id { get; }
}

public abstract class AsyncClientBase : IAsyncClientBase
{
protected virtual int ReceiveBufferSize { get; } = 1024;
private TcpClient Client { get; set; }
private byte[] ReceiveBuffer { get; set; }
public event EventHandler<AsyncClientBaseErrorEventArgs> Error;
public string Id { get; }

public AsyncClientBase()
{
Id = Guid.NewGuid().ToString();
}

public void AttachClient(TcpClient client)
{
if(ReceiveBuffer != null) { throw new InvalidOperationException(); }

ReceiveBuffer = new byte[ReceiveBufferSize];
Client = client;

try
{
Client.GetStream().
BeginRead(ReceiveBuffer, 0, ReceiveBufferSize, OnDataReceived, this);
OnAttachedToServer();
}
catch (Exception ex)
{
Error?.Invoke(this,
new AsyncClientBaseErrorEventArgs(ex, "BeginRead"));
}
}

public void DetachClient(Exception ex)
{
try
{
Client.Close();
OnDetachedFromServer(ex);
}
catch { /* intentionally swallow */ }

Client = null;
ReceiveBuffer = null;
}

public virtual void OnDataReceived(byte[] buffer) { }
public virtual void OnAttachedToServer() { }
public virtual void OnDetachedFromServer(Exception ex) { }

public void WriteData(byte[] data)
{
try
{
Client.GetStream().BeginWrite(data, 0, data.Length, OnDataWrote, this);
}
catch(Exception ex)
{
Error?.Invoke(this, new AsyncClientBaseErrorEventArgs(ex, "BeginWrite"));
}
}

private static void OnDataReceived(IAsyncResult iar)
{
var me = (AsyncClientBase)iar.AsyncState;

if(me.Client == null) { return; }

try
{
var bytesRead = me.Client.GetStream().EndRead(iar);
var buf = new byte[bytesRead];
Array.Copy(me.ReceiveBuffer, buf, bytesRead);

me.OnDataReceived(buf);
}
catch (Exception ex)
{
me.Error?.Invoke(me, new AsyncClientBaseErrorEventArgs(ex, "EndRead"));
}
}

private static void OnDataWrote(IAsyncResult iar)
{
var me = (AsyncClientBase)iar.AsyncState;
try
{
me.Client.GetStream().EndWrite(iar);
}
catch(Exception ex)
{
me.Error?.Invoke(me,
new AsyncClientBaseErrorEventArgs(ex, "EndWrite"));
}
}
}
}
现在你所有的基本代码都写好了。您无需以任何方式更改此设置。您只需实现自己的客户端和服务器即可做出相应的响应。例如,这是一个基本的服务器实现:
public class MyServer : AsyncServerBase<MyClient>
{
public MyServer(int port) : base(port)
{
}

public override void OnClientConnected(MyClient client)
{
Console.WriteLine($"* MyClient connected with Id: {client.Id}");
base.OnClientConnected(client);
}

public override void OnClientDisconnected(MyClient client, Exception ex)
{
Console.WriteLine($"***** MyClient disconnected with Id: {client.Id} ({ex.Message})");
base.OnClientDisconnected(client, ex);
}
}
这是上面服务器用于通信的客户端:
public class MyClient : AsyncClientBase
{
public override void OnAttachedToServer()
{
base.OnAttachedToServer();

Console.WriteLine($"{Id}: {GetType().Name} attached. Waiting for data...");
}

public override void OnDataReceived(byte[] buffer)
{
base.OnDataReceived(buffer);

Console.WriteLine($"{Id}: {GetType().Name} recieved {buffer.Length} bytes. Writing 5 bytes back.");
WriteData(new byte[] { 1, 2, 3, 4, 5 });
}

public override void OnDetachedFromServer(Exception ex)
{
base.OnDetachedFromServer(ex);

Console.WriteLine($"{Id}: {GetType().Name} detached.");
}
}
为了强调这一点,这里有另一个客户端,它可以简单地插入相同的服务器实现,但赋予它不同的特性:
public class MyOtherClient : AsyncClientBase
{
public override void OnAttachedToServer()
{
base.OnAttachedToServer();

Console.WriteLine($"{Id}: {GetType().Name} attached. Writing 4 bytes back.");
WriteData(new byte[] { 1, 2, 3, 4 });
}

public override void OnDataReceived(byte[] buffer)
{
base.OnDataReceived(buffer);

Console.WriteLine($"{Id}: {GetType().Name} recieved {buffer.Length} bytes.");
}

public override void OnDetachedFromServer(Exception ex)
{
base.OnDetachedFromServer(ex);

Console.WriteLine($"{Id}: {GetType().Name} detached.");
}
}
至于使用它,这里有一个小测试程序,它通过压力测试:
    class Program
{
static void Main(string[] args)
{
var servers = new IAsyncServerBase[]
{
new MyServer(50000),
new MyServer(50001),
new MyOtherServer(50002)
};

foreach (var s in servers)
{
s.StartListening();
}

RunTestUsingMyServer("1", 89, 50000);
RunTestUsingMyServer("2", 127, 50001);
RunTestUsingMyOtherServer("3", 88, 50002);

Console.Write("Press any key to exit... ");
Console.ReadKey(true);

foreach (var s in servers)
{
s.WriteDataToAllClients(new byte[] { 1, 2, 3, 4, 5 });
s.StopListening();
}
}

private static void RunTestUsingMyServer(string name, int clientCount, int port)
{
Parallel.For(0, clientCount, x =>
{
using (var t = new TcpClient())
{
t.Connect(IPAddress.Loopback, port);
t.GetStream().Write(new byte[] { 1, 2, 3, 4, 5 }, 0, 5);
t.GetStream().Read(new byte[512], 0, 512);
t.Close();
}
Console.WriteLine($"FINISHED PASS {name} #{x}");
});
}

private static void RunTestUsingMyOtherServer(string name, int clientCount, int port)
{
Parallel.For(0, clientCount, x =>
{
using (var t = new TcpClient())
{
t.Connect(IPAddress.Loopback, port);
t.GetStream().Read(new byte[512], 0, 512);
t.GetStream().Write(new byte[] { 1, 2, 3, 4, 5, 6 }, 0, 6);
t.Close();
}
Console.WriteLine($"FINISHED PASS {name} #{x}");
});
}
}
如果有兴趣,这里是 full source code你可以 checkout 。希望这可以使您达到与重用代码有关的目标。

关于c# - 在 C# 中重构异步套接字编程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63905911/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com