gpt4 book ai didi

c# - .NET 中的异步读取问题 - 似乎我的回调在不应该被调用时被多次调用

转载 作者:太空狗 更新时间:2023-10-29 20:36:54 24 4
gpt4 key购买 nike

基本上,我正在为一个设计为非常快速、小巧和健壮的系统编写一些代码。我从 TcpListener 的一些异步示例开始。和 TcpClient并写了一个Server和Client类,基本上在我的项目中多处使用。

基本上我的服务器类(代码将在后面)都是基于事件的,客户端代码也是如此。当我收到数据包 - 一个接一个 - 通过服务器或客户端套接字进入时 - 一切正常。

然而,如果发送方——例如 A 类使用客户端类——通过 TCP 流向 B 类中的服务器类发送一堆数据包。当然,服务器类可能将所有数据包作为一个大集水器。所以当数据接收事件的回调发生时,我抓取缓冲区然后处理它。

这就是有趣的事情发生的地方。我的问题不是从大缓冲区中拆分所有数据包。我的问题是,由于某种我无法理解的原因.. 假设我从客户端向服务器发送 5 个数据包(反之亦然),另一方得到全部 5 个数据包。 datarecieve 事件触发,然后窃贼被捕获,所有 5 个数据包数据包在那里。他们被处理。但随后事件再次触发..

换句话说,不是事件触发一次,而是为 5 个单独的数据包触发 5 次,我最终处理了包含这 5 个数据包的缓冲区 5 次。

由于我正在设计分布式网络,这意味着模块与之通信的节点(模块(客户端类)<--> 节点(服务器类)<--> 客户端(客户端类))获得 25 个数据包而不是 5 个. 然后它将这些转发到目的地,得到 25*5 或 125 个数据包。

我很确定我在这里遗漏了一些明显的东西。我试过想办法让事件只触发一次..我最终可能会认输并重写 Server 和 Client 类,以便它们同步并且每个客户端实例(或在服务器端)有一个线程,一个接受的线程,每个客户端连接一个线程) - 这样我就可以更好地处理数据流。 IE。数据包进入,如果它的整个过程。如果不是,请等待它完整等等。使用典型的开始/结束特殊字节等等。

服务器类 - 大部分都在那里。去掉了一些不相关的,比如 KillClient 等。

   public class Server
{
private TcpListener serverListener;
private List<ServerClient> clients;

#region Callbacks


public delegate void incomingDataCallback(byte[] buffer, string clientID, TcpClient tcpClient);
public incomingDataCallback incomingData = null;

public delegate void incomingConnectionCallback(string clientID, TcpClient tcpClient);
public incomingConnectionCallback incomingConnection = null;

public delegate void connectionClosedCallback(string clientID, TcpClient tcpClient);
public connectionClosedCallback connectionClosed = null;

public delegate void dataWrittenCallback(string clientID, TcpClient tcpClient);
public dataWrittenCallback dataWritten = null;


#endregion

// Constructor
public Server(string listenIP, int listenPort)
{
// Create a new instance of serverlistener.
serverListener = new TcpListener(IPAddress.Parse(listenIP), listenPort);
this.clients = new List<ServerClient>();
this.Encoding = Encoding.Default;
}

~Server()
{
// Shut down the server.
this.Stop();
}

public Encoding Encoding { get; set; }

public IEnumerable<TcpClient> TcpClients
{
get
{
foreach (ServerClient client in this.clients)
{
yield return client.TcpClient;
}
}
}

public IEnumerable<TcpClient> TcpClients
{
get
{
foreach (ServerClient client in this.clients)
{
yield return client.TcpClient;
}
}
}

public void Stop()
{
this.serverListener.Stop();
lock (this.clients)
{
foreach (ServerClient client in this.clients)
{
client.TcpClient.Client.Disconnect(false);
if (connectionClosed != null)
connectionClosed(client.ID, client.TcpClient);
}
this.clients.Clear();
}
}

public void WriteToClient(TcpClient tcpClient, byte[] bytes)
{
NetworkStream networkStream = tcpClient.GetStream();

try
{
networkStream.BeginWrite(bytes, 0, bytes.Length, WriteCallback, tcpClient);
}
catch (System.IO.IOException ex)
{
// Port was closed before data could be written.
// So remove this guy from clients.
lock (this.clients)
{
foreach (ServerClient cl in clients)
{
if (cl.TcpClient.Equals(tcpClient))
{
this.clients.Remove(cl);
if (connectionClosed != null)
connectionClosed(cl.ID, cl.TcpClient);
break;
}
}
}

}
}

private void WriteCallback(IAsyncResult result)
{
TcpClient tcpClient = result.AsyncState as TcpClient;
NetworkStream networkStream = tcpClient.GetStream();
networkStream.EndWrite(result);

// Get the ID and return it
//ServerClient client = result.AsyncState as ServerClient;

//string ipaddr = ((IPEndPoint)(tcpClient.Client.RemoteEndPoint)).Address.ToString();
string port = ((IPEndPoint)(tcpClient.Client.RemoteEndPoint)).Port.ToString();


Console.WriteLine("Write callback called for: " + port);

// if (dataWritten != null)
// dataWritten(client.ID, tcpClient);
}

private void AcceptTcpClientCallback(IAsyncResult result)
{
TcpClient tcpClient;

try
{
tcpClient = serverListener.EndAcceptTcpClient(result);
}
catch
{
// Often get this error when shutting down the server
return;
}

NetworkStream networkStream = tcpClient.GetStream();
byte[] buffer = new byte[tcpClient.ReceiveBufferSize];

// Get the IP Address.. this will be used for id purposes.
string ipaddr = ((IPEndPoint)(tcpClient.Client.RemoteEndPoint)).Address.ToString();
string port = ((IPEndPoint)(tcpClient.Client.RemoteEndPoint)).Port.ToString();

// Create a client object for this client.
ServerClient client = new ServerClient(tcpClient, buffer, ipaddr + ":" + port);

Console.WriteLine("Data availiable: " + networkStream.DataAvailable.ToString());
Console.WriteLine("Amount of data: " + tcpClient.Available.ToString());

// Lock the list and add it in.
lock (this.clients)
{
this.clients.Add(client);
}

if (networkStream.DataAvailable)
{

int read = networkStream.Read(client.Buffer, 0, client.Buffer.Length);
Console.WriteLine("Calling ReadHandle directly with " + read.ToString() + " number of bytes. for clientid: " + client.ID);
ReadHandle(client, read, networkStream);

}
else
{

Console.WriteLine("Started beginRead for client in accept connection: " + client.ID);
networkStream.BeginRead(client.Buffer, 0, client.Buffer.Length, ReadCallback, client);
//networkStream.

Console.WriteLine("Data availiable: " + networkStream.DataAvailable.ToString());
Console.WriteLine("Amount of data: " + tcpClient.Available.ToString());
}

Console.WriteLine("Starting BeginAcceptTcpClient again - client: " + client.ID);
serverListener.BeginAcceptTcpClient(AcceptTcpClientCallback, null);

// Notify owner that new connection came in
if (incomingConnection != null)
incomingConnection(client.ID, tcpClient);
}

private void ReadCallback(IAsyncResult result)
{
ServerClient client = result.AsyncState as ServerClient;


if (client == null)
{
Console.WriteLine("ReadCallback: Null client");
return;
}

int read = 0;

NetworkStream networkStream = client.NetworkStream;
try
{
read = networkStream.EndRead(result);
}
catch (System.IO.IOException ex)
{
Console.WriteLine("ReadCallback: Exception occured during reading.. Message: " + ex.Message + " client " + client.ID);
lock (this.clients)
{
this.clients.Remove(client);
if (connectionClosed != null)
connectionClosed(client.ID, client.TcpClient);
return;
}

}

ReadHandle(client, read, networkStream);
}

private void ReadHandle(ServerClient client, int read, NetworkStream networkStream)
{



// If zero bytes read, then client disconnected.
if (read == 0)
{
Console.WriteLine("ReadHandle: Read == 0, closing connection for Client: " + client.ID);
lock (this.clients)
{
this.clients.Remove(client);
if (connectionClosed != null)
connectionClosed(client.ID, client.TcpClient);
return;
}
}

//string data = this.Encoding.GetString(client.Buffer, 0, read);

// Do something with the data object here.
if (incomingData != null)
incomingData(client.Buffer, client.ID, client.TcpClient);

// Go back to accepting data from client.
try
{
networkStream.BeginRead(client.Buffer, 0, client.Buffer.Length, ReadCallback, client);
Console.WriteLine("ReadHandle: BeginRead called for client " + client.ID);
}
catch (Exception ex)
{
// Damn, we just lost the client.
Console.WriteLine("ReadHandle: Exception occured during trying to BeginRead.. Message: " + ex.Message + " client " + client.ID);
lock (this.clients)
{
this.clients.Remove(client);
if (connectionClosed != null)
connectionClosed(client.ID, client.TcpClient);
return;
}
}

}
}

internal class ServerClient
{
public ServerClient(TcpClient tcpClient, byte[] buffer, string ipaddr)
{
if (tcpClient == null) throw new ArgumentNullException("tcpClient");
if (buffer == null) throw new ArgumentNullException("tcpClient");
if (ipaddr == null) throw new ArgumentNullException("tcpClient");

this.TcpClient = tcpClient;
this.Buffer = buffer;
this.ID = ipaddr;
}

public TcpClient TcpClient { get; private set; }
public byte[] Buffer { get; private set; }
public string ID { get; private set; }
public NetworkStream NetworkStream
{
get
{
return TcpClient.GetStream();
}
}
}
}

这是客户端类 - 与服务器相比,它更小更简单。
public class Client
{
private IPAddress address;
private int port;
private string ID;

//private WaitHandle addressSet;
private TcpClient tcpClient;
private int failedConnectionCount;

public bool keepOnTrying = false;

#region Callbacks

public delegate void incomingDataCallback(byte[] buffer, string serverID);
public incomingDataCallback incomingData = null;


public delegate void connectedCallback(string serverID);
public connectedCallback clientConnected = null;

public delegate void connectionFailedCallback(string serverID);
public connectionFailedCallback clientConnectionFailed = null;

public delegate void connectionClosedCallback(string serverID);
public connectionClosedCallback connectionClosed = null;

public delegate void dataWrittenCallback(string serverID);
public dataWrittenCallback dataWritten = null;

#endregion

public Client(IPAddress address, int port)
{
this.address = address;

if (port < 0) throw new ArgumentException();

this.port = port;
this.tcpClient = new TcpClient();
this.Encoding = Encoding.Default;
this.ID = address.ToString() + ":" + port.ToString();

tcpClient.ReceiveBufferSize = 16384;
tcpClient.SendBufferSize = 16384;
}

// Destructor
~Client()
{
this.Disconnect();
}

public Encoding Encoding { get; set; }


public void Connect()
{
tcpClient.BeginConnect(address, port, ConnectCallback, null);
}

public void Disconnect()
{
tcpClient.Close();
if (connectionClosed != null)
connectionClosed(ID);
}

public void Write(byte[] bytes)
{
NetworkStream networkStream = tcpClient.GetStream();

networkStream.BeginWrite(bytes, 0, bytes.Length, WriteCallback, null);
}

private void WriteCallback(IAsyncResult result)
{
NetworkStream networkStream = tcpClient.GetStream();

if (tcpClient.Connected)
{
networkStream.EndWrite(result);
}

if (dataWritten != null)
dataWritten(ID);
}

private void ConnectCallback(IAsyncResult result)
{
// Check to see if connected successfully or not. If we didnt, then the try/catch block will increment
// the failed connection count.
try
{
tcpClient.EndConnect(result);
}
catch
{
Interlocked.Increment(ref failedConnectionCount);
if (keepOnTrying)
tcpClient.BeginConnect(address, port, ConnectCallback, null);

if (clientConnectionFailed != null)
clientConnectionFailed(ID);

return;
}

// Connected successfully.
// Now begin async read operation.

NetworkStream networkStream = tcpClient.GetStream();
byte[] buffer = new byte[tcpClient.ReceiveBufferSize];
networkStream.BeginRead(buffer, 0, buffer.Length, ReadCallback, buffer);

if (clientConnected != null)
clientConnected(ID);
}

private void ReadCallback(IAsyncResult result)
{
int read;
NetworkStream networkStream;

try
{
networkStream = tcpClient.GetStream();
read = networkStream.EndRead(result);

}
catch
{
// An error has occured when reading.. -.-
Console.WriteLine("Error occured while reading for ID: " + ID);
return;
}



// If read is 0, then connection was closed

if (read == 0)
{
if (connectionClosed != null)
connectionClosed(ID);
return;
}

if (result.IsCompleted == false)
{
Console.WriteLine("Uh oh ");
}

byte[] buffer = result.AsyncState as byte[];

if (incomingData != null)
incomingData(buffer, ID);

// Then begin reading again.
networkStream.BeginRead(buffer, 0, buffer.Length, ReadCallback, buffer);
}

}

我使用这些类的方式是这样的:
  • 创建一个类,然后创建一个服务器或客户端的对象。
  • 绑定(bind)所有回调。 IE。在您的类中为每个回调创建函数。
  • 调用服务器启动,或客户端连接。取决于您使用的是哪种。

  • 因此,要复制我的问题,请执行以下操作:
  • 在一个程序中创建服务器类,在另一个程序中创建客户端。让客户端连接到服务器。
  • 对传入的数据进行回调。我使用序列化,因此您可以执行类似的操作。
  • 让客户端一次向服务器发送一堆数据。对我来说,我在模块中将 JSON 数据转换为我自己的格式,然后将其发送到服务器。所以服务器一次得到一堆数据包。
  • 应该看到 - 如果它足够快 - 服务器会将所有数据包放入接收缓冲区,并且每次调用incomingDataCallback - 您将拥有一个包含所有数据包的缓冲区。它会为收到的每个数据包调用它。不是字节,整个数据包。

  • 所以在我去重写代码以同步并在线程中运行之前:
  • 有什么我可以做的不同/更好的事情,以便在数据进入时这样做 - 要么调用一次事件,我可以处理缓冲区中的所有数据包 - 或者 -
  • 有没有办法确保被调用的任何其他事件不会与初始事件共享相同的缓冲区?我知道这是浪费处理器时间 - 但我可以在我的incomingDataCallback 处理程序中有一个“如果前 10 个字节是 00,则返回”行。这就是为什么我想在第一个事件中将缓冲区全部清空并在后续事件中检测它们的原因。

  • 更新:由于 Servy 的评论 - 这是我如何使用这些类。不是 c/ping 一切,只是相关部分。

    节点 - 使用服务器类。
    class ModuleClient
    {
    private List<ModuleClientInfo> clients = new List<ModuleClientInfo>();
    private Server myServer = null;

    public ModuleClient()
    {
    // create a server object
    myServer = new Server("127.0.0.1", 9000);

    // Attach callbacks
    myServer.connectionClosed = connClosed;
    myServer.dataWritten = dataWritten;
    myServer.incomingConnection = incomingConn;
    myServer.incomingData = incomingData;
    }

    public void startListeningForModules()
    {
    if (!listeningForModules)
    myServer.Start();
    else
    return;

    listeningForModules = true;
    }

    private void incomingData(byte[] buffer, string clientID, TcpClient tcpClient)
    {
    Console.WriteLine("Incoming Data from " + clientID);

    incomingPacketStruct newPacket = new incomingPacketStruct();
    newPacket.clientID = clientID;
    newPacket.buffer = buffer;
    newPacket.tcpClient = tcpClient;
    }

    在传入数据中,我注意到缓冲区中有 5 个数据包,然后调用了传入数据 5 次。

    现在至于客户端的incomingData(请记住,我没有注意到传出数据中的这种行为,也不相关。假设我一次收到 10 个 json 数据包,我会将它们发送到节点 - 所以这是 10 个写入。节点将把它们都放在同一个缓冲区中,然后它会调用服务器的传入数据 10 次,每次都会看到 10 个数据包。

    客户端的传入数据:
    public partial class Program : ServiceBase
    {
    // Globals
    private static SocketObject.Client myHermesClient = null;
    private static JSONInterface myJsonInterface = null;

    private static void mainThread(object data)
    {

    // Take care of client and callbacks..
    myHermesClient = new SocketObject.Client(System.Net.IPAddress.Parse("127.0.0.1"), 9000);
    myHermesClient.connectionClosed = hermesConnectionClosed;
    myHermesClient.clientConnected = hermesConnected;
    myHermesClient.dataWritten = hermesDataWritten;
    myHermesClient.incomingData = hermesIncomingData;
    myHermesClient.clientConnectionFailed = hermesConnectionFailed;

    myHermesClient.keepOnTrying = true;

    // Begin async connect
    myHermesClient.Connect();


    // Main loop for service.
    while (serviceRunning)
    {
    Thread.Sleep(500);
    }

    }

    #region Hermes Client Code
    private static void hermesIncomingData(byte[] buffer, string serverID)
    {

    }

    再说一遍,同样的事情。当服务器向客户端发送大量数据时.. 如果你打破并查看缓冲区,你就会明白我在说什么。

    现在,想说清楚。我的问题不是分解数据包。我有代码(不包括,因为专有,与此无关 - 它不修改缓冲区,仅从中创建对象列表) - 但问题是回调被多次调用,如上所述。

    最佳答案

    private void ReadCallback(IAsyncResult result)

    ReadHandle(client, read, networkStream); 

    然后在里面 ReadHandle()您再次设置回拨。

    关于c# - .NET 中的异步读取问题 - 似乎我的回调在不应该被调用时被多次调用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15170738/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com