gpt4 book ai didi

c# - SocketAsyncEventArgs UDP 服务器缓冲区偶尔充满零

转载 作者:行者123 更新时间:2023-11-30 18:35:33 31 4
gpt4 key购买 nike

我在 SocketAsyncEventArgs buffer is full of zeroes 中描述了类似的问题其中我的 SocketAsyncEventArgs UDP 服务器的实现接收具有以下内容的数据包:

  1. SocketAsyncEventArgs.BytesTransferred > 0
  2. SocketAsyncEventArgs.Buffer 中没有数据

这只会偶尔在负载下发生,并且可以在 3 台独立的机器上重现。我想这也是导致我的另一个 Unresolved SO 问题 ( FatalExecutionEngineError Detected when unhooking event ) 出现问题的原因。

到目前为止,UDP 服务器的实现如下:

/// <summary>
/// Provides a basic implementation of a UDPSocket based on ISocket
/// </summary>
public class UDPSocket : ISocket
{
#region "Private Variables"
private Socket _socket;
private IBufferManagerProvider _bufferManager;
#endregion

#region "Public Properties"
public Int32 Port { get; private set; }
public Int32 MessagePrefixLength { get; set; }
public IPAddress ListeningAddress { get; private set; }
public ILogProvider LogProvider { get; private set; }
public bool AllowAddressReuse { get; private set; }
#endregion

#region "Constructors"
private UDPSocket() { }
public UDPSocket(String listeningAddress) : this(listeningAddress, 4444, null, null) { }
public UDPSocket(String listeningAddress, Int32 port) : this(listeningAddress, port, null, null) { }
public UDPSocket(Int32 port) : this("0.0.0.0", port, null, null) { }

public UDPSocket(String listeningAddress, Int32 port, IBufferManagerProvider manager, ILogProvider logger)
{
// Setup the port
if (port <= 0)
{
throw new ArgumentException("Port number cannot be less than 0.");
}
else
{
this.Port = port;
}

// check the ip address
if (String.IsNullOrEmpty(listeningAddress))
{
throw new Exception("The listening address supplied is not valid.");
}
this.ListeningAddress = IPAddress.Parse(listeningAddress);

// check the interfaces
this.LogProvider = (logger == null) ? new DefaultLogProvider(LogLevel.None) : logger;
_bufferManager = (manager == null) ? new DefaultBufferManager(100, 2048, null, null) : manager;

// use a default message prefix
this.MessagePrefixLength = 4;
}
#endregion

#region "Event Handlers"
#endregion

#region "Internal handler methods"
private void Receive()
{
SocketAsyncEventArgs args = _bufferManager.TakeNextSocketAsyncEventArgs();
byte[] buff = _bufferManager.TakeNextBuffer();
args.SetBuffer(buff, 0, buff.Length);
args.Completed += PacketReceived;
args.RemoteEndPoint = new IPEndPoint(IPAddress.Any, this.Port);

try
{
if (!_socket.ReceiveMessageFromAsync(args))
{
OnPacketReceived(args);
}
}
catch
{
// we should only jump here when we disconnect all the clients.
}
}

private void PacketReceived(object sender, SocketAsyncEventArgs e)
{
OnPacketReceived(e);
}

private void OnPacketReceived(SocketAsyncEventArgs e)
{
// Start a new Receive operation straight away
Receive();

// Now process the packet that we have already
if (e.BytesTransferred <= MessagePrefixLength)
{
// Error condition, empty packet
this.LogProvider.Log(String.Format("Empty packet received from {0}. Discarding packet.", e.ReceiveMessageFromPacketInfo.Address.ToString()), "UDPSocket.OnPacketReceived", LogLevel.Minimal);
ReleaseArgs(e);
return;
}

//
// The buffer can occassionally be zero'd at this point in time
//

// Get the message length from the beginning of the packet.
byte[] arrPrefix = new byte[MessagePrefixLength];
Buffer.BlockCopy(e.Buffer, 0, arrPrefix, 0, MessagePrefixLength);
Int32 messageLength = BitConverter.ToInt32(arrPrefix, 0);

// the number of bytes remaining to store
Int32 bytesToProcess = e.BytesTransferred - MessagePrefixLength;

if (bytesToProcess < messageLength)
{
this.LogProvider.Log(String.Format("Missing data from {0}. Discarding packet.", e.ReceiveMessageFromPacketInfo.Address.ToString()), "UDPSocket.OnPacketReceived", LogLevel.Minimal);
ReleaseArgs(e);
return;
}

// Create a data buffer
byte[] data = new byte[messageLength];

// Copy the remaining data to the data buffer on the user token
Buffer.BlockCopy(e.Buffer, MessagePrefixLength, data, 0, messageLength);

// Data is safely stored, so unhook the event and return the SocketAsyncEventArgs back to the pool
ReleaseArgs(e);

// Thread safe way of triggering the event
var evnt = OnDataReceived;

if (evnt != null)
{
evnt(e, new SocketDataEventArgs(data));
}
}

private void ReleaseArgs(SocketAsyncEventArgs e)
{
e.Completed -= PacketReceived;
_bufferManager.InsertSocketAsyncEventArgs(e);
_bufferManager.InsertBuffer(e.Buffer);
}
#endregion

#region "ISocket implicit implementation"
public void Start()
{
this.LogProvider.Log("Starting. Creating socket", "UDPSocket.Start", LogLevel.Verbose);
_socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
_socket.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.PacketInformation, true);
_socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, this.AllowAddressReuse);
_socket.Bind(new IPEndPoint(this.ListeningAddress, this.Port));

// use a default message prefix
this.MessagePrefixLength = 4;

// begin receiving packets
Receive();

this.LogProvider.Log("Socket created. Listening for packets", "UDPSocket.Start", LogLevel.Verbose);
}

public void Stop()
{
// do a shutdown before you close the socket
try
{
_socket.Shutdown(SocketShutdown.Both);
this.LogProvider.Log("Clean socket shutdown", "TCPSocket.CloseSocket", LogLevel.Verbose);
}
// throws if socket was already closed
catch (Exception ex)
{
this.LogProvider.Log("Error closing socket - " + ex.Message, "TCPSocket.CloseSocket", LogLevel.Minimal);
}

// Close the socket, which calls Dispose internally
_socket.Close();
this.LogProvider.Log("Socket closed", "TCPSocket.CloseSocket", LogLevel.Verbose);
}

public event EventHandler<SocketDataEventArgs> OnDataReceived;
#endregion
}

如果需要,我可以制作完整的服务器/客户端演示,并发布其他类,例如缓冲区管理器。这是我第一次使用 SocketAsyncEventArgs,所以我的实现可能不是 100% 正确

最佳答案

我想我找到了问题所在。而不是使用 Stack<T>为了管理缓冲区,我已经转移到 Queue<T>这大大减少了问题。对于那些感兴趣的人,这是我对缓冲区队列的实现:

/// <summary>
/// Creates a managed Queue that makes a program wait when the resources are depleated
/// </summary>
/// <typeparam name="T">The type of Queue to create</typeparam>
public sealed class ManagedQueue<T>
{
#region "Constructors"
public ManagedQueue(Int32 capacity = 400, bool fillQueue = false, Queue<T> queue = null)
{
Capacity = capacity;
_queue = queue ?? new Queue<T>(capacity);
_restrictor = new SemaphoreSlim((fillQueue) ? Capacity : 0, Capacity);

if (fillQueue)
{
// Setup the queue with default values
for (Int32 i = 0; i < Capacity; i++)
{
Insert(default(T));
}
}
}
#endregion

/// <summary>
/// Gets the defined over all queue Capacity
/// </summary>
public Int32 Capacity { get; private set; }

// The queue to hold the items
private readonly Queue<T> _queue;

// The SemaphoreSlim to restrict access to the queue
private readonly SemaphoreSlim _restrictor;

/// <summary>
/// Take the next resource available from the queue. This is a blocking operation if capacity is reached.
/// </summary>
/// <returns>The next resource available</returns>
public T TakeNext()
{
// Sanity Check
if (_queue == null)
{
throw new InvalidOperationException("The queue cannot be null");
}

// make us wait if necessary
_restrictor.Wait();

lock (_queue)
{
if (_queue.Count > 0)
{
return _queue.Dequeue();
}
throw new Exception("There has been a Semaphore/queue offset");
}
}

/// <summary>
/// Adds an item to the queue. This will release other threads if they are blocked
/// </summary>
/// <param name="item"></param>
public void Insert(T item)
{
// Sanity Check
if (_queue == null)
{
throw new InvalidOperationException("The queue cannot be null");
}

// Sanity Check
if (item == null)
{
throw new ArgumentException("The item cannot be null");
}

lock (_queue)
{
_queue.Enqueue(item);
_restrictor.Release();
}
}
}

关于c# - SocketAsyncEventArgs UDP 服务器缓冲区偶尔充满零,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14858952/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com