gpt4 book ai didi

c# - 网络流写入被阻止

转载 作者:可可西里 更新时间:2023-11-01 02:52:33 25 4
gpt4 key购买 nike

我正在开发一个接受来自不同客户端的多个 tcp 连接的 c# 应用程序 (.net 4)。有一个接受套接字的 tcp 监听器。双工通信黑白节点。数据使用 Networkstream.Write 方法发送,使用 Networkstream.read 方法读取。为每个 tcp 连接创建一个单独的线程。

问题是,几天前我们注意到其中一个客户端停止读取数据(由于错误)达 20 分钟。由于连接没有中断,服务器上没有 (IO) 异常。但是,我们注意到其他客户端的数据也没有运行。 20 分钟后,该客户端再次开始接收数据,很快其他客户端也开始接收数据。

我知道网络流的写入方法是一种阻塞方法,我们没有使用任何超时。因此,有可能写入已被阻止(描述为 here )。但据我了解,每个 tcp 连接都必须有一个单独的写缓冲区,或者还有更多的东西在起作用。 TCP 连接上的发送阻塞是否会影响同一应用程序中的其他 TCP 连接?

这里是写操作的伪代码。对于每个连接,一个单独的线程都有一个单独的传出队列进程。

public class TCPServerListener : baseConnection
{

private readonly int _Port;
private TcpListener _tcpListener;
private Thread _thread;
private List<TcpClientData> _tcpClientDataList = new List<TcpClientData>();
private long _messageDiscardTimeout;
private bool LoopForClientConnection = true;

public TCPServerListener(int port, ThreadPriority threadPriority)
{
try
{
// init property
}
catch (Exception ex)
{
// log
}
}

public void SendMessageToAll(int type)
{
base.EnqueueMessageToSend(type, _tcpClientDataList);
}
public void SendMessageToList(int type, IList<TcpClient> tcpClientList)
{
base.EnqueueMessageToSend(type, tcpClientList);
}
public void SendMessage(int type, TcpClient tcpClient)
{
base.EnqueueMessageToSend(type, tcpClient);
}



private void AcceptClientConnections()
{
while (LoopForClientConnection)
{
try
{
Socket socket = _tcpListener.AcceptSocket();
TcpClientData tcpClientData = new TcpClientData();
tcpClientData.tcpClientThread = new Thread(new ParameterizedThreadStart(StartAsync));
tcpClientData.tcpClientThread.Priority = _threadPriority;
tcpClientData.tcpClientThread.IsBackground = true;
tcpClientData.tcpClientThread.Name = "CD" + tcpClientData.tcpClientThread.ManagedThreadId;
tcpClientData.tcpClient = new TcpClient();
tcpClientData.tcpClient.Client = socket;
_tcpClientDataList.Add(tcpClientData);
tcpClientData.tcpClientThread.Start(tcpClientData.tcpClient);
}
catch (ThreadAbortException ex)
{
//log

}
catch (Exception ex)
{
//log
}
}
}




public override void Start()
{
base.Start();
_tcpListener = new TcpListener(System.Net.IPAddress.Any, _Port);

_thread = new Thread(AcceptClientConnections);
_thread.Priority = _threadPriority;
_thread.IsBackground = true;

_tcpListener.Start();
_thread.Start();
}

public override void Stop()
{
// stop listener and terminate threads
}
}


public class baseConnection
{
private Thread _InCommingThread;
private Thread _OutGoingThread;
protected ThreadPriority _threadPriority;
protected BlockingCollection<MessageReceived> _InComingMessageQueue = new BlockingCollection<MessageReceived>();
protected BlockingCollection<MessageToSend> _OutgoingMessageQueue = new BlockingCollection<MessageToSend>();

public void StartAsync(Object oTcpClient)
{
TcpClient tcpClient = oTcpClient as TcpClient;
if (tcpClient == null)
return;

using (tcpClient)
{
using (NetworkStream stream = tcpClient.GetStream())
{
stream.ReadTimeout = Timeout.Infinite;
stream.WriteTimeout = Timeout.Infinite;

BinaryReader bodyReader = new BinaryReader(stream);

while (tcpClient.Connected)
{
try
{
int messageType = bodyReader.ReadInt32();

// checks to verify messages

// enqueue message in incoming queue
_InComingMessageQueue.Add(new MessageReceived(messageType, tcpClient));
}
catch (EndOfStreamException ex)
{
// log
break;
}
catch (Exception ex)
{
// log
Thread.Sleep(100);
}
}
//RaiseDisconnected(tcpClient);
}
}
}


public virtual void Start()
{
_InCommingThread = new Thread(HandleInCommingMessnge);
_InCommingThread.Priority = _threadPriority;
_InCommingThread.IsBackground = true;
_InCommingThread.Start();

_OutGoingThread = new Thread(HandleOutgoingQueue);
_OutGoingThread.Priority = _threadPriority;
_OutGoingThread.IsBackground = true;
_OutGoingThread.Start();
}


public virtual void Stop()
{
// stop the threads and free up resources
}

protected void EnqueueMessageToSend(int type, List<TcpClientData> tcpClientDataList)
{
tcpClientDataList.ForEach(x => _OutgoingMessageQueue.Add(new MessageToSend(type, x.tcpClient)));
}
protected void EnqueueMessageToSend(int type, IList<TcpClient> tcpClientList)
{
foreach (TcpClient tcpClient in tcpClientList)
{
_OutgoingMessageQueue.Add(new MessageToSend(type, tcpClient));
}
}
protected void EnqueueMessageToSend(int type, TcpClient tcpClient)
{
_OutgoingMessageQueue.Add(new MessageToSend(type, tcpClient));
}


private void HandleOutgoingQueue()
{
while (true)
{
try
{

MessageToSend message = _OutgoingMessageQueue.Take();

if (message.tcpClient.Connected)
{
BinaryWriter writer = new BinaryWriter(message.tcpClient.GetStream());
writer.Write(message.type);
}
}
catch (ThreadAbortException ex)
{
// log
return;
}
catch (Exception ex)
{
//_logger.Error(ex.Message, ex);
}
}
}

private void HandleInCommingMessnge()
{
while (true)
{
try
{
MessageReceived messageReceived = _InComingMessageQueue.Take();

// handle message
}
catch (ThreadAbortException ex)
{
// log
return;
}
catch (Exception ex)
{
// log
//_logger.Error(ex.Message, ex);
}
}
}

public class MessageReceived
{
public MessageReceived(int type, TcpClient tcpClient)
{
this.tcpClient = tcpClient;
this.type = type;
}

public int type;
public TcpClient tcpClient;
}

public class MessageToSend
{
public MessageToSend(int type, TcpClient tcpClient)
{
this.tcpClient = tcpClient;
this.type = type;
}

public int type;
public TcpClient tcpClient;
}

public class TcpClientData
{
public Thread tcpClientThread;
public TcpClient tcpClient;
}
}

最佳答案

您提到为每个连接创建一个单独的线程,但您显示的代码似乎能够为任何 连接使消息出队。

如果此代码在多个线程上运行,程序将在每个线程当前都尝试向阻塞连接发送消息时立即阻塞。如果此循环在多个线程上运行,您可能会遇到的另一个问题是消息可能不会以正确的顺序到达同一连接。

关于c# - 网络流写入被阻止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14251948/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com