gpt4 book ai didi

c# - 如何在 C# 应用程序中取消异步发送套接字

转载 作者:太空宇宙 更新时间:2023-11-03 13:11:25 25 4
gpt4 key购买 nike

我有一个应用程序,它只是从队列中提取项目,然后尝试通过网络套接字异步发送它们。

当出现问题或客户端中止主机套接字时,我遇到了一些问题。

这是我的一些代码:我认为它可能比我的话解释得更好:

这是我的 SocketState.cs 类,它包含套接字和相关信息:

public class SocketState
{
public const int BufferSize = 256;
public Socket WorkSocket { get; set; }
public byte[] Buffer { get; set; }

/// <summary>
/// Constructor receiving a socket
/// </summary>
/// <param name="socket"></param>
public SocketState(Socket socket)
{
WorkSocket = socket;
Buffer = new byte[BufferSize];
}
}

这是我的 SocketHandler.cs 类,它控制大部分套接字操作:

public class SocketHandler : IObserver
{
# region Class Variables
private static readonly log4net.ILog Log = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
private SocketState _state;
private OutgoingConnectionManager _parentConnectionManager;
private int _recieverId;
private readonly ManualResetEvent _sendDone = new ManualResetEvent(false);

public volatile bool NameIsSet = false;
private ManualResetEvent _receiveDone = new ManualResetEvent(false);
public String Name;
public readonly Guid HandlerId;
# endregion Class Variables

/// <summary>
/// Constructor
/// </summary>
public SocketHandler(SocketState state)
{
HandlerId = Guid.NewGuid();
_state = state;
_state.WorkSocket.BeginReceive(_state.Buffer, 0, SocketState.BufferSize, 0, new AsyncCallback(ReceiveCallback), this._state);
}

/// <summary>
/// Set the receiver id for this socket.
/// </summary>
public void SetReceiverId(int receiverId)
{
_recieverId = receiverId;
}

/// <summary>
/// Stops / closes a connection
/// </summary>
public void Stop()
{
CloseConnection();
}

/// <summary>
/// Used to set this connections parent connection handler
/// </summary>
/// <param name="conMan"></param>
public void SetConnectionManager(OutgoingConnectionManager conMan)
{
_parentConnectionManager = conMan;
}

/// <summary>
/// Returns if the socket is connected or not
/// </summary>
/// <returns></returns>
public bool IsConnected()
{
return _state.WorkSocket.Connected;
}

/// <summary>
/// Public Class that implements the observer interface , this function provides a portal to receive new messages which it must send
/// </summary>
/// <param name="e"> Event to execute</param>
public void OnMessageRecieveEvent(ObserverEvent e)
{
SendSignalAsync(e.Message.payload);
}
# region main working space

# region CallBack Functions
/// <summary>
/// CallBack Function that is called when a connection Receives Some Data
/// </summary>
/// <param name="ar"></param>
private void ReceiveCallback(IAsyncResult ar)
{
try
{
String content = String.Empty;
if (ar != null)
{
SocketState state = (SocketState)ar.AsyncState;
if (state != null)
{
Socket handler = state.WorkSocket;
if (handler != null)
{
int bytesRead = handler.EndReceive(ar);

if (bytesRead > 0)
{
StringBuilder Sb = new StringBuilder();
Sb.Append(Encoding.Default.GetString(state.Buffer, 0, bytesRead));

if (Sb.Length > 1)
{
content = Sb.ToString();

foreach (var s in content.Split('Ÿ'))
{
if (string.Compare(s, 0, "ID", 0, 2) == 0)
{
Name = s.Substring(2);
NameIsSet = true;
}

if (string.Compare(s, 0, "TG", 0, 2) == 0)
{
LinkReplyToTag(s.Substring(2), this.Name);
}
}
_state.WorkSocket.BeginReceive(_state.Buffer, 0, SocketState.BufferSize, 0,
new AsyncCallback(ReceiveCallback), _state);
}
}
}
}
}
}
catch
{
CloseConnection();
}
}

/// <summary>
/// Call Back Function called when data is send though this connection
/// </summary>
/// <param name="asyncResult"></param>
private void SendCallback(IAsyncResult asyncResult)
{
try
{
if (asyncResult != null)
{
Socket handler = (Socket)asyncResult.AsyncState;
if (handler != null)
{
int bytesSent = handler.EndSend(asyncResult);
// Signal that all bytes have been sent.
_sendDone.Set();
if (bytesSent > 0)
{
return;
}
}
}
}
catch (Exception e)
{
Log.Error("Transmit Failed On Send CallBack");
}
//Close socket as something went wrong
CloseConnection();
}
# endregion CallBack Functions

/// <summary>
/// Sends a signal out of the current connection
/// </summary>
/// <param name="signal"></param>
private void SendSignalAsync(Byte[] signal)
{
try
{
if (_state != null)
{
if (_state.WorkSocket != null)
{
if (_state.WorkSocket.Connected)
{
try
{
_sendDone.Reset();
_state.WorkSocket.BeginSend(signal, 0, signal.Length, 0, new AsyncCallback(SendCallback),
_state.WorkSocket);
_sendDone.WaitOne(200);
return;
}
catch (Exception e)
{
Log.Error("Transmission Failier for IP: " + ((IPEndPoint)_state.WorkSocket.RemoteEndPoint).Address, e);
}
}
}
}
//Close Connection as something went wrong
CloseConnection();
}
catch (Exception e)
{
Log.Error("An Exception has occurred in the SendSignalAsync function", e);
}
}


/// <summary>
/// Call this to Close the connection
/// </summary>
private void CloseConnection()
{
try
{
var ip = "NA";
try
{
if (_state != null)
{
ip = ((IPEndPoint)_state.WorkSocket.RemoteEndPoint).Address.ToString();
}
}
catch
{
//Cannot get the ip address
}
OutgoingListeningServer.UpdateRecieversHistory(_recieverId, ip, "Disconnected");

try
{
if (_state != null)
{
if (_state.WorkSocket != null)
{
_state.WorkSocket.Shutdown(SocketShutdown.Both);
_state.WorkSocket.Close();
//_state.WorkSocket.Dispose();
_state.WorkSocket = null;
_state = null;
}
}
}
catch (Exception e)
{
_state = null;
Log.Error("Error while trying to close socket for IP: " + ip, e);
}
if (_parentConnectionManager != null)
{
// Remove this connection from the connection list
_parentConnectionManager.ConnectionRemove(this);
}
}
catch (Exception e)
{
Log.Error("A major error occurred in the close connection function, outer try catch was hit", e);
}
}
# endregion main working space
}

这是我的线程,它将调用 SocketHandler.cs 类中的 OnMessageRecieveEvent() 函数。

  private void Main()
{
Log.Info("Receiver" + ReceiverDb.Id + " Thread Starting");
// Exponential back off
var eb = new ExponentialBackoff();
try
{
while (_run)
{
try
{
if (ReceiverOutgoingConnectionManager.HasConnectedClient())
{
//Fetch Latest Item
ILogItem logItem;
if (_receiverQueue.TryDequeue(out logItem))
{
//Handle the logItem
**calls the OnMessageRecieveEvent() for all my connections.
ReceiverOutgoingConnectionManager.SendItemToAllConnections(logItem);
//Reset the exponential back off counter
eb.reset();
}
else
{
//Exponential back off thread sleep
eb.sleep();
}
}
else
{
//Exponential back off thread sleep
eb.sleep();
}
}
catch (Exception e)
{
Log.Error("Error occurred in " + ReceiverDb.Id + "receiver mains thread ", e);
}
}
}
catch (Exception e)
{
Log.Error(" ** An error has occurred in a receiver holder main thread ** =====================================================================>", e);
}
Log.Info("Receiver" + ReceiverDb.Id + " Thread Exiting ** ===============================================================================>");
}

我为这么多代码道歉。但我担心这可能不是很明显,所以我发布了所有相关代码。

现在进一步解释我的问题。如果套接字发生错误。我分配了 Transmit Failed On Send CallBack。这对我来说意味着我没有正确关闭套接字,并且仍然有未完成的回调正在执行。

有没有办法在我关闭套接字时取消所有未完成的回调?

我也确信我发布的代码会有一些建议/问题。我非常感谢您的反馈。

最佳答案

我假设这是出于学习目的,因为为其他目的编写自己的网络代码有点……困难

在发送回调中获取异常很好。这就是异常(exception)的用途。但是,您需要识别异常,而不是仅仅捕捉一揽子Exception 并几乎忽略其中的所有信息。

在适合处理异常的地方处理你能处理的异常。

我不会说得太具体,因为您的代码存在很多问题。但关键之一是忽略正确的套接字处理 - 当您收到 0 字节时,这意味着您应该关闭套接字。那是来自另一方的信号,说“我们完成了”。忽略这一点,您将使用(可能部分)关闭的连接。

请使用一些可以为您提供所需保证(和简单性)的网络库。 WCF、Lindgren 等等。例如,TCP 不保证您会在一部分中收到消息,因此您的消息解析代码不可靠。您需要使用一些消息框架,您需要添加适当的错误处理... Socket 不是高级构造,它不会“自动”工作,您需要实现所有这些自己的东西。

即使忽略网络代码本身,很明显您只是忽略了异步代码的大部分复杂性。 SendDone 怎么了?要么你想使用异步代码,然后摆脱同步性,要么你想要同步代码,然后你为什么要使用异步套接字?

关于c# - 如何在 C# 应用程序中取消异步发送套接字,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28448442/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com