gpt4 book ai didi

c# - 用于 UDP 服务器的 C# 可等待 SocketAsyncEventArgs 包装器; socket.ReceiveAsync 返回无效的 RemoteEndPoint (0.0.0.0 :0)

转载 作者:行者123 更新时间:2023-12-03 12:02:57 25 4
gpt4 key购买 nike

正如标题所说,我正在原始套接字上编写一个 UDP 服务器,并使用 SocketAsyncEventArgs,因为我想快速编写一些东西。

我知道 UdpClient 存在,并且有更简单的方法来编写服务器,但我想了解如何正确使用 SocketAsyncEventArgs 和 socket.ReceiveFromAsync/socket.SendToAsync 方法来实现他们声称的“增强吞吐量”和“更好的可扩展性”(MSDN Docs for SocketAsyncEventArgs)。

我基本上遵循了 MSDN 示例,因为我认为这是了解这些方法如何工作的一个不错的起点,但遇到了一些问题。服务器最初工作并可以回显接收到的字节,但会随机“失败”从正确地址接收字节。 RemoteEndPoint 将由 UDP 占位符 EndPoint {0.0.0.0:0} 填充,而不是正确的 localhost 客户端地址(例如 127.0.0.1:7007)(因为没有更好的术语)。 Image showing the problem (控制台是波兰语,抱歉。请相信我,SocketException 消息是“所需地址在此上下文中无效”)。

我有时会从 MSDN 示例中大量删除 block ,只更改为 socket.ReceiveFromAsync 调用填充在 SocketAsyncEventArgs 实例上的字段(根据 MSDN 文档 -> socket.ReceiveFromAsync Docs ),最终结果仍然是相同的。此外,这是一个间歇性的问题,而不是一个恒定的问题。据我所知,服务器没有时间会持续出错。

到目前为止,我的想法是 UdpServer 的状态问题、UdpClient 端的一些不一致或 TaskCompletionSource 的滥用。

编辑1:

我觉得我应该解决我使用 SocketAsyncEventArgs 的原因。我完全理解有更简单的方法来发送和接收数据。 async/await 套接字扩展是解决这个问题的好方法,也是我最初做的方式。我想对 async/await 与较旧的 api SocketArgs 进行基准测试,看看这两种方法有多大不同。

下面包含 UdpClient、UdpServer 和共享结构的代码。如果 StackOverflow 允许,我也可以尝试按需提供更多代码。

感谢您花时间帮助我。

测试代码

private static async Task TestNetworking()
{
EndPoint serverEndPoint = new IPEndPoint(IPAddress.Loopback, 12345);

await Task.Factory.StartNew(async () =>
{
SocketClient client = new UdpClient();
bool bound = client.Bind(new IPEndPoint(IPAddress.Any, 7007));
if (bound)
{
Console.WriteLine($"[Client] Bound client socket!");
}

if (bound && client.Connect(serverEndPoint))
{
Console.WriteLine($"[Client] Connected to {serverEndPoint}!");

byte[] message = Encoding.UTF8.GetBytes("Hello World!");

Stopwatch stopwatch = new Stopwatch();

const int packetsToSend = 1_000_000;

for (int i = 0; i < packetsToSend; i++)
{
try
{
stopwatch.Start();

int sentBytes = await client.SendAsync(serverEndPoint, message, SocketFlags.None);

//Console.WriteLine($"[Client] Sent {sentBytes} to {serverEndPoint}");

ReceiveResult result = await client.ReceiveAsync(serverEndPoint, SocketFlags.None);

//Console.WriteLine($"[{result.RemoteEndPoint} > Client] : {Encoding.UTF8.GetString(result.Contents)}");
serverEndPoint = result.RemoteEndPoint;

stopwatch.Stop();
}
catch (Exception ex)
{
Console.WriteLine(ex);
i--;
await Task.Delay(1);
}
}

double approxBandwidth = (packetsToSend * message.Length) / (1_000_000.0 * (stopwatch.ElapsedMilliseconds / 1000.0));

Console.WriteLine($"Sent {packetsToSend} packets of {message.Length} bytes in {stopwatch.ElapsedMilliseconds:N} milliseconds.");
Console.WriteLine($"Approximate bandwidth: {approxBandwidth} MBps");
}
}, TaskCreationOptions.LongRunning);

await Task.Factory.StartNew(async () =>
{
try
{
SocketServer server = new UdpServer();
bool bound = server.Bind(serverEndPoint);
if (bound)
{
//Console.WriteLine($"[Server] Bound server socket!");

//Console.WriteLine($"Starting server at {serverEndPoint}!");

await server.StartAsync();
}
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
}).Result;
}

共享代码
public readonly struct ReceiveResult
{
public const int PacketSize = 1024;

public readonly Memory<byte> Contents;

public readonly int ReceivedBytes;

public readonly EndPoint RemoteEndPoint;

public ReceiveResult(Memory<byte> contents, int receivedBytes, EndPoint remoteEndPoint)
{
Contents = contents;

ReceivedBytes = receivedBytes;

RemoteEndPoint = remoteEndPoint;
}
}

UDP客户端
public class UdpClient : SocketClient
{
/*
public abstract class SocketClient
{
protected readonly Socket socket;

protected SocketClient(AddressFamily addressFamily, SocketType socketType, ProtocolType protocolType)
{
socket = new Socket(addressFamily, socketType, protocolType);
}

public bool Bind(in EndPoint localEndPoint)
{
try
{
socket.Bind(localEndPoint);

return true;
}
catch (Exception ex)
{
Console.WriteLine(ex);

return false;
}
}

public bool Connect(in EndPoint remoteEndPoint)
{
try
{
socket.Connect(remoteEndPoint);

return true;
}
catch (Exception ex)
{
Console.WriteLine(ex);

return false;
}
}

public abstract Task<ReceiveResult> ReceiveAsync(EndPoint remoteEndPoint, SocketFlags socketFlags);

public abstract Task<int> SendAsync(EndPoint remoteEndPoint, ArraySegment<byte> buffer, SocketFlags socketFlags);
}
*/
/// <inheritdoc />
public UdpClient() : base(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp)
{
}

public override async Task<ReceiveResult> ReceiveAsync(EndPoint remoteEndPoint, SocketFlags socketFlags)
{
byte[] buffer = new byte[ReceiveResult.PacketSize];

SocketReceiveFromResult result =
await socket.ReceiveFromAsync(new ArraySegment<byte>(buffer), socketFlags, remoteEndPoint);

return new ReceiveResult(buffer, result.ReceivedBytes, result.RemoteEndPoint);

/*
SocketAsyncEventArgs args = new SocketAsyncEventArgs();
args.SetBuffer(new byte[ReceiveResult.PacketSize]);
args.SocketFlags = socketFlags;
args.RemoteEndPoint = remoteEndPoint;
SocketTask awaitable = new SocketTask(args);

while (ReceiveResult.PacketSize > args.BytesTransferred)
{
await socket.ReceiveFromAsync(awaitable);
}

return new ReceiveResult(args.MemoryBuffer, args.RemoteEndPoint);
*/
}

public override async Task<int> SendAsync(EndPoint remoteEndPoint, ArraySegment<byte> buffer, SocketFlags socketFlags)
{
return await socket.SendToAsync(buffer.ToArray(), socketFlags, remoteEndPoint);

/*
SocketAsyncEventArgs args = new SocketAsyncEventArgs();
args.SetBuffer(buffer);
args.SocketFlags = socketFlags;
args.RemoteEndPoint = remoteEndPoint;
SocketTask awaitable = new SocketTask(args);

while (buffer.Length > args.BytesTransferred)
{
await socket.SendToAsync(awaitable);
}

return args.BytesTransferred;
*/
}
}

UDP 服务器
public class UdpServer : SocketServer
{
/*
public abstract class SocketServer
{
protected readonly Socket socket;

protected SocketServer(AddressFamily addressFamily, SocketType socketType, ProtocolType protocolType)
{
socket = new Socket(addressFamily, socketType, protocolType);
}

public bool Bind(in EndPoint localEndPoint)
{
try
{
socket.Bind(localEndPoint);

return true;
}
catch (Exception ex)
{
Console.WriteLine(ex);

return false;
}
}

public abstract Task StartAsync();
}
*/

private const int MaxPooledObjects = 100;
private readonly ConcurrentDictionary<EndPoint, ConcurrentQueue<byte[]>> clients;

private readonly ArrayPool<byte> receiveBufferPool = ArrayPool<byte>.Create(ReceiveResult.PacketSize, MaxPooledObjects);

private readonly ObjectPool<SocketAsyncEventArgs> receiveSocketAsyncEventArgsPool =
new DefaultObjectPool<SocketAsyncEventArgs>(new DefaultPooledObjectPolicy<SocketAsyncEventArgs>(), MaxPooledObjects);

private readonly ObjectPool<SocketAsyncEventArgs> sendSocketAsyncEventArgsPool =
new DefaultObjectPool<SocketAsyncEventArgs>(new DefaultPooledObjectPolicy<SocketAsyncEventArgs>(), MaxPooledObjects);

private void HandleIOCompleted(object? sender, SocketAsyncEventArgs eventArgs)
{
eventArgs.Completed -= HandleIOCompleted;
bool closed = false;

/*
Original (local) methods in ReceiveAsync and SendAsync,
these were assigned to eventArgs.Completed instead of HandleIOCompleted
=======================================================================

void ReceiveCompletedHandler(object? sender, SocketAsyncEventArgs eventArgs)
{
AsyncReadToken asyncReadToken = (AsyncReadToken)eventArgs.UserToken;
eventArgs.Completed -= ReceiveCompletedHandler;

if (eventArgs.SocketError != SocketError.Success)
{
asyncReadToken.CompletionSource.TrySetException(new SocketException((int)eventArgs.SocketError));
}
else
{
eventArgs.MemoryBuffer.CopyTo(asyncReadToken.OutputBuffer);

asyncReadToken.CompletionSource.TrySetResult(
new ReceiveResult(asyncReadToken.OutputBuffer, eventArgs.BytesTransferred, eventArgs.RemoteEndPoint));
}

receiveBufferPool.Return(asyncReadToken.RentedBuffer);
receiveSocketAsyncEventArgsPool.Return(eventArgs);
}

void SendCompletedHandler(object? sender, SocketAsyncEventArgs eventArgs)
{
AsyncWriteToken asyncWriteToken = (AsyncWriteToken)eventArgs.UserToken;
eventArgs.Completed -= SendCompletedHandler;

if (eventArgs.SocketError != SocketError.Success)
{
asyncWriteToken.CompletionSource.TrySetException(new SocketException((int)eventArgs.SocketError));
}
else
{
asyncWriteToken.CompletionSource.TrySetResult(eventArgs.BytesTransferred);
}

sendSocketAsyncEventArgsPool.Return(eventArgs);
}
*/

switch (eventArgs.LastOperation)
{
case SocketAsyncOperation.SendTo:
AsyncWriteToken asyncWriteToken = (AsyncWriteToken)eventArgs.UserToken;

if (eventArgs.SocketError != SocketError.Success)
{
asyncWriteToken.CompletionSource.TrySetException(new SocketException((int)eventArgs.SocketError));
}
else
{
asyncWriteToken.CompletionSource.TrySetResult(eventArgs.BytesTransferred);
}

sendSocketAsyncEventArgsPool.Return(eventArgs);

break;

case SocketAsyncOperation.ReceiveFrom:
AsyncReadToken asyncReadToken = (AsyncReadToken)eventArgs.UserToken;

if (eventArgs.SocketError != SocketError.Success)
{
asyncReadToken.CompletionSource.TrySetException(new SocketException((int)eventArgs.SocketError));
}
else
{
eventArgs.MemoryBuffer.CopyTo(asyncReadToken.OutputBuffer);

asyncReadToken.CompletionSource.TrySetResult(
new ReceiveResult(asyncReadToken.OutputBuffer, eventArgs.BytesTransferred, eventArgs.RemoteEndPoint));
}

receiveBufferPool.Return(asyncReadToken.RentedBuffer);
receiveSocketAsyncEventArgsPool.Return(eventArgs);

break;

case SocketAsyncOperation.Disconnect:
closed = true;
break;

case SocketAsyncOperation.Accept:
case SocketAsyncOperation.Connect:
case SocketAsyncOperation.None:
break;
}

if (closed)
{
// handle the client closing the connection on tcp servers at some point
}
}

private Task<ReceiveResult> ReceiveAsync(EndPoint remoteEndPoint, SocketFlags socketFlags, Memory<byte> outputBuffer)
{
TaskCompletionSource<ReceiveResult> tcs = new TaskCompletionSource<ReceiveResult>();

byte[] buffer = receiveBufferPool.Rent(ReceiveResult.PacketSize);
Memory<byte> memoryBuffer = new Memory<byte>(buffer);

SocketAsyncEventArgs args = receiveSocketAsyncEventArgsPool.Get();
args.SetBuffer(memoryBuffer);
args.SocketFlags = socketFlags;
args.RemoteEndPoint = remoteEndPoint;
args.UserToken = new AsyncReadToken(buffer, outputBuffer, tcs);
args.Completed += HandleIOCompleted;

if (socket.ReceiveFromAsync(args)) return tcs.Task;

byte[] bufferCopy = new byte[ReceiveResult.PacketSize];

args.MemoryBuffer.CopyTo(bufferCopy);

ReceiveResult result = new ReceiveResult(bufferCopy, args.BytesTransferred, args.RemoteEndPoint);

receiveBufferPool.Return(buffer);
receiveSocketAsyncEventArgsPool.Return(args);

return Task.FromResult(result);
}

private Task<int> SendAsync(EndPoint remoteEndPoint, Memory<byte> buffer, SocketFlags socketFlags)
{
TaskCompletionSource<int> tcs = new TaskCompletionSource<int>();

SocketAsyncEventArgs args = sendSocketAsyncEventArgsPool.Get();
args.SetBuffer(buffer);
args.SocketFlags = socketFlags;
args.RemoteEndPoint = remoteEndPoint;
args.UserToken = new AsyncWriteToken(buffer, tcs);
args.Completed += HandleIOCompleted;

if (socket.SendToAsync(args)) return tcs.Task;

int result = args.BytesTransferred;
sendSocketAsyncEventArgsPool.Return(args);

return Task.FromResult(result);
}

private readonly struct AsyncReadToken
{
public readonly TaskCompletionSource<ReceiveResult> CompletionSource;

public readonly Memory<byte> OutputBuffer;
public readonly byte[] RentedBuffer;

public AsyncReadToken(byte[] rentedBuffer, Memory<byte> outputBuffer, TaskCompletionSource<ReceiveResult> tcs)
{
RentedBuffer = rentedBuffer;
OutputBuffer = outputBuffer;

CompletionSource = tcs;
}
}

private readonly struct AsyncWriteToken
{
public readonly TaskCompletionSource<int> CompletionSource;

public readonly Memory<byte> OutputBuffer;

public AsyncWriteToken(Memory<byte> outputBuffer, TaskCompletionSource<int> tcs)
{
OutputBuffer = outputBuffer;

CompletionSource = tcs;
}
}

public UdpServer() : base(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp)
{
clients = new ConcurrentDictionary<EndPoint, ConcurrentQueue<byte[]>>();
}

/// <inheritdoc />
public override async Task StartAsync()
{
EndPoint nullEndPoint = new IPEndPoint(IPAddress.Any, 0);
byte[] receiveBuffer = new byte[ReceiveResult.PacketSize];
Memory<byte> receiveBufferMemory = new Memory<byte>(receiveBuffer);

while (true)
{
ReceiveResult result = await ReceiveAsync(nullEndPoint, SocketFlags.None, receiveBufferMemory);

Console.WriteLine($"[{result.RemoteEndPoint} > Server] : {Encoding.UTF8.GetString(result.Contents.Span)}");

int sentBytes = await SendAsync(result.RemoteEndPoint, result.Contents, SocketFlags.None);
Console.WriteLine($"[Server > {result.RemoteEndPoint}] Sent {sentBytes} bytes to {result.RemoteEndPoint}");
}
}
}

最佳答案

我设法解决了这个问题!

我最终不得不合并 SocketAsyncEventArgs 池,因为事实证明您需要在接收和发送调用期间保留一个 args 对象。现在,我的 SendToAsync 函数采用 SocketAsyncEventArgs 对象(在 ReceiveFromAsync 调用中租用),其中包含要向其发送响应的客户端的 RemoteEndPoint。 SendToAsync 函数用于清理 SocketAsyncEventArgs,并将它们返回到池中。

我早期解决方案的另一个问题是多次分配事件。当我合并两个套接字参数池时,我留下了多事件处理程序分配,这最终导致了问题。一旦解决了这个问题,该解决方案就可以完全按预期工作,并且可以毫无问题地发送 1 000 000 个数据包(1Kb)。真正早期的测试(可能稍微偏离了一点)显示带宽约为每秒 5 兆字节(大约每秒 40 兆比特),这是可以接受的,并且比我使用自己的代码过于复杂的“快速异步”版本得到的要好得多。

关于带宽,我的快速异步版本过于复杂,因此无法真正进行比较,但我相信这个 SocketAsyncEventArgs 版本可以作为基准测试和修补的一个很好的起点,以尽可能多地从套接字中挤出性能。不过,我仍然希望对此提供反馈,并且可能会在某个时候将其发布到 Code Review 堆栈交换中,因为我怀疑解决方案中是否仍然存在细微的错误。

谁想使用这个代码是免费的,它最终比预期的更简单,更容易创建,但如果你愚蠢到在没有广泛测试的情况下在生产中使用它,我不承担任何责任(毕竟这是一个学习项目)。

测试代码:

private static async Task TestNetworking()
{
EndPoint serverEndPoint = new IPEndPoint(IPAddress.Loopback, 12345);

await Task.Factory.StartNew(async () =>
{
try
{
SocketServer server = new UdpServer();
bool bound = server.Bind(serverEndPoint);
if (bound)
{
Console.WriteLine($"[Server] Bound server socket!");

Console.WriteLine($"[Server] Starting server at {serverEndPoint}!");

await server.StartAsync();
}
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
}, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);

await Task.Factory.StartNew(async () =>
{
SocketClient client = new UdpClient();
bool bound = client.Bind(new IPEndPoint(IPAddress.Any, 7007));
if (bound)
{
Console.WriteLine($"[Client] Bound client socket!");
}

if (bound && client.Connect(serverEndPoint))
{
Console.WriteLine($"[Client] Connected to {serverEndPoint}!");

byte[] message = Encoding.UTF8.GetBytes("Hello World!");
Memory<byte> messageBuffer = new Memory<byte>(message);

byte[] response = new byte[ReceiveResult.PacketSize];
Memory<byte> responseBuffer = new Memory<byte>(response);

Stopwatch stopwatch = new Stopwatch();

const int packetsToSend = 1_000_000, statusPacketThreshold = 10_000;

Console.WriteLine($"Started sending packets (total packet count: {packetsToSend})");

for (int i = 0; i < packetsToSend; i++)
{
if (i % statusPacketThreshold == 0)
{
Console.WriteLine($"Sent {i} packets out of {packetsToSend} ({((double)i / packetsToSend) * 100:F2}%)");
}

try
{
//Console.WriteLine($"[Client > {serverEndPoint}] Sending packet {i}");
stopwatch.Start();

int sentBytes = await client.SendAsync(serverEndPoint, messageBuffer, SocketFlags.None);

//Console.WriteLine($"[Client] Sent {sentBytes} to {serverEndPoint}");

ReceiveResult result = await client.ReceiveAsync(serverEndPoint, SocketFlags.None, responseBuffer);

//Console.WriteLine($"[{result.RemoteEndPoint} > Client] : {Encoding.UTF8.GetString(result.Contents)}");
serverEndPoint = result.RemoteEndPoint;

stopwatch.Stop();
}
catch (Exception ex)
{
Console.WriteLine(ex);
i--;
await Task.Delay(1);
}
}

double approxBandwidth = (packetsToSend * ReceiveResult.PacketSize) / (1_000_000.0 * (stopwatch.ElapsedMilliseconds / 1000.0));

Console.WriteLine($"Sent {packetsToSend} packets of {ReceiveResult.PacketSize} bytes in {stopwatch.ElapsedMilliseconds:N} milliseconds.");
Console.WriteLine($"Approximate bandwidth: {approxBandwidth} MBps");
}
}, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default).Result;
}

共享代码:
internal readonly struct AsyncReadToken
{
public readonly CancellationToken CancellationToken;
public readonly TaskCompletionSource<ReceiveResult> CompletionSource;
public readonly byte[] RentedBuffer;
public readonly Memory<byte> UserBuffer;

public AsyncReadToken(byte[] rentedBuffer, Memory<byte> userBuffer, TaskCompletionSource<ReceiveResult> tcs,
CancellationToken cancellationToken = default)
{
RentedBuffer = rentedBuffer;
UserBuffer = userBuffer;

CompletionSource = tcs;
CancellationToken = cancellationToken;
}
}

internal readonly struct AsyncWriteToken
{
public readonly CancellationToken CancellationToken;
public readonly TaskCompletionSource<int> CompletionSource;
public readonly byte[] RentedBuffer;
public readonly Memory<byte> UserBuffer;

public AsyncWriteToken(byte[] rentedBuffer, Memory<byte> userBuffer, TaskCompletionSource<int> tcs,
CancellationToken cancellationToken = default)
{
RentedBuffer = rentedBuffer;
UserBuffer = userBuffer;

CompletionSource = tcs;
CancellationToken = cancellationToken;
}
}

public readonly struct ReceiveResult
{
public const int PacketSize = 1024;

public readonly SocketAsyncEventArgs ClientArgs;

public readonly Memory<byte> Contents;

public readonly int Count;

public readonly EndPoint RemoteEndPoint;

public ReceiveResult(SocketAsyncEventArgs clientArgs, Memory<byte> contents, int count, EndPoint remoteEndPoint)
{
ClientArgs = clientArgs;

Contents = contents;
Count = count;
RemoteEndPoint = remoteEndPoint;
}
}

服务器代码:
public abstract class SocketServer
{
protected readonly Socket socket;

protected SocketServer(AddressFamily addressFamily, SocketType socketType, ProtocolType protocolType)
{
socket = new Socket(addressFamily, socketType, protocolType);
}

public bool Bind(in EndPoint localEndPoint)
{
try
{
socket.Bind(localEndPoint);

return true;
}
catch (Exception ex)
{
Console.WriteLine(ex);

return false;
}
}

public abstract Task StartAsync();
}

public class UdpServer : SocketServer
{
private const int MaxPooledObjects = 1;
private readonly ConcurrentDictionary<EndPoint, ConcurrentQueue<byte[]>> clients;

private readonly ArrayPool<byte> receiveBufferPool =
ArrayPool<byte>.Create(ReceiveResult.PacketSize, MaxPooledObjects);

private readonly ArrayPool<byte> sendBufferPool =
ArrayPool<byte>.Create(ReceiveResult.PacketSize, MaxPooledObjects);

private readonly ObjectPool<SocketAsyncEventArgs> socketAsyncEventArgsPool =
new DefaultObjectPool<SocketAsyncEventArgs>(new DefaultPooledObjectPolicy<SocketAsyncEventArgs>(),
MaxPooledObjects);

private void HandleIOCompleted(object? sender, SocketAsyncEventArgs eventArgs)
{
bool closed = false;

switch (eventArgs.LastOperation)
{
case SocketAsyncOperation.SendTo:
AsyncWriteToken asyncWriteToken = (AsyncWriteToken)eventArgs.UserToken;

if (asyncWriteToken.CancellationToken.IsCancellationRequested)
{
asyncWriteToken.CompletionSource.TrySetCanceled();
}
else
{
if (eventArgs.SocketError != SocketError.Success)
{
asyncWriteToken.CompletionSource.TrySetException(
new SocketException((int)eventArgs.SocketError));
}
else
{
asyncWriteToken.CompletionSource.TrySetResult(eventArgs.BytesTransferred);
}
}

sendBufferPool.Return(asyncWriteToken.RentedBuffer, true);
socketAsyncEventArgsPool.Return(eventArgs);

break;

case SocketAsyncOperation.ReceiveFrom:
AsyncReadToken asyncReadToken = (AsyncReadToken)eventArgs.UserToken;

if (asyncReadToken.CancellationToken.IsCancellationRequested)
{
asyncReadToken.CompletionSource.SetCanceled();
}
else
{
if (eventArgs.SocketError != SocketError.Success)
{
asyncReadToken.CompletionSource.SetException(
new SocketException((int)eventArgs.SocketError));
}
else
{
eventArgs.MemoryBuffer.CopyTo(asyncReadToken.UserBuffer);
ReceiveResult result = new ReceiveResult(eventArgs, asyncReadToken.UserBuffer,
eventArgs.BytesTransferred, eventArgs.RemoteEndPoint);

asyncReadToken.CompletionSource.SetResult(result);
}
}

receiveBufferPool.Return(asyncReadToken.RentedBuffer, true);

break;

case SocketAsyncOperation.Disconnect:
closed = true;
break;

case SocketAsyncOperation.Accept:
case SocketAsyncOperation.Connect:
case SocketAsyncOperation.None:
case SocketAsyncOperation.Receive:
case SocketAsyncOperation.ReceiveMessageFrom:
case SocketAsyncOperation.Send:
case SocketAsyncOperation.SendPackets:
throw new NotImplementedException();

default:
throw new ArgumentOutOfRangeException();
}

if (closed)
{
// handle the client closing the connection on tcp servers at some point
}
}

private Task<ReceiveResult> ReceiveAsync(EndPoint remoteEndPoint, SocketFlags socketFlags,
Memory<byte> outputBuffer, CancellationToken cancellationToken = default)
{
TaskCompletionSource<ReceiveResult> tcs = new TaskCompletionSource<ReceiveResult>();

byte[] rentedBuffer = receiveBufferPool.Rent(ReceiveResult.PacketSize);
Memory<byte> memoryBuffer = new Memory<byte>(rentedBuffer);

SocketAsyncEventArgs args = socketAsyncEventArgsPool.Get();

args.SetBuffer(memoryBuffer);
args.SocketFlags = socketFlags;
args.RemoteEndPoint = remoteEndPoint;
args.UserToken = new AsyncReadToken(rentedBuffer, outputBuffer, tcs, cancellationToken);

// if the receive operation doesn't complete synchronously, returns the awaitable task
if (socket.ReceiveFromAsync(args)) return tcs.Task;

args.MemoryBuffer.CopyTo(outputBuffer);

ReceiveResult result = new ReceiveResult(args, outputBuffer, args.BytesTransferred, args.RemoteEndPoint);

receiveBufferPool.Return(rentedBuffer, true);

return Task.FromResult(result);
}

private Task<int> SendAsync(SocketAsyncEventArgs clientArgs, Memory<byte> inputBuffer, SocketFlags socketFlags,
CancellationToken cancellationToken = default)
{
TaskCompletionSource<int> tcs = new TaskCompletionSource<int>();

byte[] rentedBuffer = sendBufferPool.Rent(ReceiveResult.PacketSize);
Memory<byte> memoryBuffer = new Memory<byte>(rentedBuffer);

inputBuffer.CopyTo(memoryBuffer);

SocketAsyncEventArgs args = clientArgs;
args.SetBuffer(memoryBuffer);
args.SocketFlags = socketFlags;
args.UserToken = new AsyncWriteToken(rentedBuffer, inputBuffer, tcs, cancellationToken);

// if the send operation doesn't complete synchronously, return the awaitable task
if (socket.SendToAsync(args)) return tcs.Task;

int result = args.BytesTransferred;

sendBufferPool.Return(rentedBuffer, true);
socketAsyncEventArgsPool.Return(args);

return Task.FromResult(result);
}

public UdpServer() : base(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp)
{
clients = new ConcurrentDictionary<EndPoint, ConcurrentQueue<byte[]>>();

for (int i = 0; i < MaxPooledObjects; i++)
{
SocketAsyncEventArgs args = new SocketAsyncEventArgs();
args.Completed += HandleIOCompleted;

socketAsyncEventArgsPool.Return(args);
}
}

/// <inheritdoc />
public override async Task StartAsync()
{
EndPoint nullEndPoint = new IPEndPoint(IPAddress.Any, 0);
byte[] receiveBuffer = new byte[ReceiveResult.PacketSize];
Memory<byte> receiveBufferMemory = new Memory<byte>(receiveBuffer);

while (true)
{
ReceiveResult result = await ReceiveAsync(nullEndPoint, SocketFlags.None, receiveBufferMemory);

//Console.WriteLine($"[{result.RemoteEndPoint} > Server] : {Encoding.UTF8.GetString(result.Contents.Span)}");

int sentBytes = await SendAsync(result.ClientArgs, result.Contents, SocketFlags.None);

//Console.WriteLine($"[Server > {result.RemoteEndPoint}] Sent {sentBytes} bytes to {result.RemoteEndPoint}");
}
}

客户代码:
public abstract class SocketClient
{
protected readonly Socket socket;

protected SocketClient(AddressFamily addressFamily, SocketType socketType, ProtocolType protocolType)
{
socket = new Socket(addressFamily, socketType, protocolType);
}

public bool Bind(in EndPoint localEndPoint)
{
try
{
socket.Bind(localEndPoint);

return true;
}
catch (Exception ex)
{
Console.WriteLine(ex);

return false;
}
}

public bool Connect(in EndPoint remoteEndPoint)
{
try
{
socket.Connect(remoteEndPoint);

return true;
}
catch (Exception ex)
{
Console.WriteLine(ex);

return false;
}
}

public abstract Task<ReceiveResult> ReceiveAsync(EndPoint remoteEndPoint, SocketFlags socketFlags,
Memory<byte> outputBuffer);

public abstract Task<int> SendAsync(EndPoint remoteEndPoint, Memory<byte> inputBuffer, SocketFlags socketFlags);
}

public class UdpClient : SocketClient
{
/// <inheritdoc />
public UdpClient() : base(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp)
{
}

public override async Task<ReceiveResult> ReceiveAsync(EndPoint remoteEndPoint, SocketFlags socketFlags,
Memory<byte> outputBuffer)
{
byte[] buffer = new byte[ReceiveResult.PacketSize];

SocketReceiveFromResult result =
await socket.ReceiveFromAsync(new ArraySegment<byte>(buffer), socketFlags, remoteEndPoint);

buffer.CopyTo(outputBuffer);

return new ReceiveResult(default, outputBuffer, result.ReceivedBytes, result.RemoteEndPoint);

/*
SocketAsyncEventArgs args = new SocketAsyncEventArgs();
args.SetBuffer(new byte[ReceiveResult.PacketSize]);
args.SocketFlags = socketFlags;
args.RemoteEndPoint = remoteEndPoint;
SocketTask awaitable = new SocketTask(args);

while (ReceiveResult.PacketSize > args.BytesTransferred)
{
await socket.ReceiveFromAsync(awaitable);
}

return new ReceiveResult(args.MemoryBuffer, args.RemoteEndPoint);
*/
}

public override async Task<int> SendAsync(EndPoint remoteEndPoint, Memory<byte> buffer, SocketFlags socketFlags)
{
return await socket.SendToAsync(buffer.ToArray(), socketFlags, remoteEndPoint);

/*
SocketAsyncEventArgs args = new SocketAsyncEventArgs();
args.SetBuffer(buffer);
args.SocketFlags = socketFlags;
args.RemoteEndPoint = remoteEndPoint;
SocketTask awaitable = new SocketTask(args);

while (buffer.Length > args.BytesTransferred)
{
await socket.SendToAsync(awaitable);
}

return args.BytesTransferred;
*/
}
}

关于c# - 用于 UDP 服务器的 C# 可等待 SocketAsyncEventArgs 包装器; socket.ReceiveAsync 返回无效的 RemoteEndPoint (0.0.0.0 :0),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60385048/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com