- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
基本上,我正在为一个设计为非常快速、小巧和健壮的系统编写一些代码。我从 TcpListener
的一些异步示例开始。和 TcpClient
并写了一个Server和Client类,基本上在我的项目中多处使用。
基本上我的服务器类(代码将在后面)都是基于事件的,客户端代码也是如此。当我收到数据包 - 一个接一个 - 通过服务器或客户端套接字进入时 - 一切正常。
然而,如果发送方——例如 A 类使用客户端类——通过 TCP 流向 B 类中的服务器类发送一堆数据包。当然,服务器类可能将所有数据包作为一个大集水器。所以当数据接收事件的回调发生时,我抓取缓冲区然后处理它。
这就是有趣的事情发生的地方。我的问题不是从大缓冲区中拆分所有数据包。我的问题是,由于某种我无法理解的原因.. 假设我从客户端向服务器发送 5 个数据包(反之亦然),另一方得到全部 5 个数据包。 datarecieve 事件触发,然后窃贼被捕获,所有 5 个数据包数据包在那里。他们被处理。但随后事件再次触发..
换句话说,不是事件触发一次,而是为 5 个单独的数据包触发 5 次,我最终处理了包含这 5 个数据包的缓冲区 5 次。
由于我正在设计分布式网络,这意味着模块与之通信的节点(模块(客户端类)<--> 节点(服务器类)<--> 客户端(客户端类))获得 25 个数据包而不是 5 个. 然后它将这些转发到目的地,得到 25*5 或 125 个数据包。
我很确定我在这里遗漏了一些明显的东西。我试过想办法让事件只触发一次..我最终可能会认输并重写 Server 和 Client 类,以便它们同步并且每个客户端实例(或在服务器端)有一个线程,一个接受的线程,每个客户端连接一个线程) - 这样我就可以更好地处理数据流。 IE。数据包进入,如果它的整个过程。如果不是,请等待它完整等等。使用典型的开始/结束特殊字节等等。
服务器类 - 大部分都在那里。去掉了一些不相关的,比如 KillClient 等。
public class Server
{
private TcpListener serverListener;
private List<ServerClient> clients;
#region Callbacks
public delegate void incomingDataCallback(byte[] buffer, string clientID, TcpClient tcpClient);
public incomingDataCallback incomingData = null;
public delegate void incomingConnectionCallback(string clientID, TcpClient tcpClient);
public incomingConnectionCallback incomingConnection = null;
public delegate void connectionClosedCallback(string clientID, TcpClient tcpClient);
public connectionClosedCallback connectionClosed = null;
public delegate void dataWrittenCallback(string clientID, TcpClient tcpClient);
public dataWrittenCallback dataWritten = null;
#endregion
// Constructor
public Server(string listenIP, int listenPort)
{
// Create a new instance of serverlistener.
serverListener = new TcpListener(IPAddress.Parse(listenIP), listenPort);
this.clients = new List<ServerClient>();
this.Encoding = Encoding.Default;
}
~Server()
{
// Shut down the server.
this.Stop();
}
public Encoding Encoding { get; set; }
public IEnumerable<TcpClient> TcpClients
{
get
{
foreach (ServerClient client in this.clients)
{
yield return client.TcpClient;
}
}
}
public IEnumerable<TcpClient> TcpClients
{
get
{
foreach (ServerClient client in this.clients)
{
yield return client.TcpClient;
}
}
}
public void Stop()
{
this.serverListener.Stop();
lock (this.clients)
{
foreach (ServerClient client in this.clients)
{
client.TcpClient.Client.Disconnect(false);
if (connectionClosed != null)
connectionClosed(client.ID, client.TcpClient);
}
this.clients.Clear();
}
}
public void WriteToClient(TcpClient tcpClient, byte[] bytes)
{
NetworkStream networkStream = tcpClient.GetStream();
try
{
networkStream.BeginWrite(bytes, 0, bytes.Length, WriteCallback, tcpClient);
}
catch (System.IO.IOException ex)
{
// Port was closed before data could be written.
// So remove this guy from clients.
lock (this.clients)
{
foreach (ServerClient cl in clients)
{
if (cl.TcpClient.Equals(tcpClient))
{
this.clients.Remove(cl);
if (connectionClosed != null)
connectionClosed(cl.ID, cl.TcpClient);
break;
}
}
}
}
}
private void WriteCallback(IAsyncResult result)
{
TcpClient tcpClient = result.AsyncState as TcpClient;
NetworkStream networkStream = tcpClient.GetStream();
networkStream.EndWrite(result);
// Get the ID and return it
//ServerClient client = result.AsyncState as ServerClient;
//string ipaddr = ((IPEndPoint)(tcpClient.Client.RemoteEndPoint)).Address.ToString();
string port = ((IPEndPoint)(tcpClient.Client.RemoteEndPoint)).Port.ToString();
Console.WriteLine("Write callback called for: " + port);
// if (dataWritten != null)
// dataWritten(client.ID, tcpClient);
}
private void AcceptTcpClientCallback(IAsyncResult result)
{
TcpClient tcpClient;
try
{
tcpClient = serverListener.EndAcceptTcpClient(result);
}
catch
{
// Often get this error when shutting down the server
return;
}
NetworkStream networkStream = tcpClient.GetStream();
byte[] buffer = new byte[tcpClient.ReceiveBufferSize];
// Get the IP Address.. this will be used for id purposes.
string ipaddr = ((IPEndPoint)(tcpClient.Client.RemoteEndPoint)).Address.ToString();
string port = ((IPEndPoint)(tcpClient.Client.RemoteEndPoint)).Port.ToString();
// Create a client object for this client.
ServerClient client = new ServerClient(tcpClient, buffer, ipaddr + ":" + port);
Console.WriteLine("Data availiable: " + networkStream.DataAvailable.ToString());
Console.WriteLine("Amount of data: " + tcpClient.Available.ToString());
// Lock the list and add it in.
lock (this.clients)
{
this.clients.Add(client);
}
if (networkStream.DataAvailable)
{
int read = networkStream.Read(client.Buffer, 0, client.Buffer.Length);
Console.WriteLine("Calling ReadHandle directly with " + read.ToString() + " number of bytes. for clientid: " + client.ID);
ReadHandle(client, read, networkStream);
}
else
{
Console.WriteLine("Started beginRead for client in accept connection: " + client.ID);
networkStream.BeginRead(client.Buffer, 0, client.Buffer.Length, ReadCallback, client);
//networkStream.
Console.WriteLine("Data availiable: " + networkStream.DataAvailable.ToString());
Console.WriteLine("Amount of data: " + tcpClient.Available.ToString());
}
Console.WriteLine("Starting BeginAcceptTcpClient again - client: " + client.ID);
serverListener.BeginAcceptTcpClient(AcceptTcpClientCallback, null);
// Notify owner that new connection came in
if (incomingConnection != null)
incomingConnection(client.ID, tcpClient);
}
private void ReadCallback(IAsyncResult result)
{
ServerClient client = result.AsyncState as ServerClient;
if (client == null)
{
Console.WriteLine("ReadCallback: Null client");
return;
}
int read = 0;
NetworkStream networkStream = client.NetworkStream;
try
{
read = networkStream.EndRead(result);
}
catch (System.IO.IOException ex)
{
Console.WriteLine("ReadCallback: Exception occured during reading.. Message: " + ex.Message + " client " + client.ID);
lock (this.clients)
{
this.clients.Remove(client);
if (connectionClosed != null)
connectionClosed(client.ID, client.TcpClient);
return;
}
}
ReadHandle(client, read, networkStream);
}
private void ReadHandle(ServerClient client, int read, NetworkStream networkStream)
{
// If zero bytes read, then client disconnected.
if (read == 0)
{
Console.WriteLine("ReadHandle: Read == 0, closing connection for Client: " + client.ID);
lock (this.clients)
{
this.clients.Remove(client);
if (connectionClosed != null)
connectionClosed(client.ID, client.TcpClient);
return;
}
}
//string data = this.Encoding.GetString(client.Buffer, 0, read);
// Do something with the data object here.
if (incomingData != null)
incomingData(client.Buffer, client.ID, client.TcpClient);
// Go back to accepting data from client.
try
{
networkStream.BeginRead(client.Buffer, 0, client.Buffer.Length, ReadCallback, client);
Console.WriteLine("ReadHandle: BeginRead called for client " + client.ID);
}
catch (Exception ex)
{
// Damn, we just lost the client.
Console.WriteLine("ReadHandle: Exception occured during trying to BeginRead.. Message: " + ex.Message + " client " + client.ID);
lock (this.clients)
{
this.clients.Remove(client);
if (connectionClosed != null)
connectionClosed(client.ID, client.TcpClient);
return;
}
}
}
}
internal class ServerClient
{
public ServerClient(TcpClient tcpClient, byte[] buffer, string ipaddr)
{
if (tcpClient == null) throw new ArgumentNullException("tcpClient");
if (buffer == null) throw new ArgumentNullException("tcpClient");
if (ipaddr == null) throw new ArgumentNullException("tcpClient");
this.TcpClient = tcpClient;
this.Buffer = buffer;
this.ID = ipaddr;
}
public TcpClient TcpClient { get; private set; }
public byte[] Buffer { get; private set; }
public string ID { get; private set; }
public NetworkStream NetworkStream
{
get
{
return TcpClient.GetStream();
}
}
}
}
public class Client
{
private IPAddress address;
private int port;
private string ID;
//private WaitHandle addressSet;
private TcpClient tcpClient;
private int failedConnectionCount;
public bool keepOnTrying = false;
#region Callbacks
public delegate void incomingDataCallback(byte[] buffer, string serverID);
public incomingDataCallback incomingData = null;
public delegate void connectedCallback(string serverID);
public connectedCallback clientConnected = null;
public delegate void connectionFailedCallback(string serverID);
public connectionFailedCallback clientConnectionFailed = null;
public delegate void connectionClosedCallback(string serverID);
public connectionClosedCallback connectionClosed = null;
public delegate void dataWrittenCallback(string serverID);
public dataWrittenCallback dataWritten = null;
#endregion
public Client(IPAddress address, int port)
{
this.address = address;
if (port < 0) throw new ArgumentException();
this.port = port;
this.tcpClient = new TcpClient();
this.Encoding = Encoding.Default;
this.ID = address.ToString() + ":" + port.ToString();
tcpClient.ReceiveBufferSize = 16384;
tcpClient.SendBufferSize = 16384;
}
// Destructor
~Client()
{
this.Disconnect();
}
public Encoding Encoding { get; set; }
public void Connect()
{
tcpClient.BeginConnect(address, port, ConnectCallback, null);
}
public void Disconnect()
{
tcpClient.Close();
if (connectionClosed != null)
connectionClosed(ID);
}
public void Write(byte[] bytes)
{
NetworkStream networkStream = tcpClient.GetStream();
networkStream.BeginWrite(bytes, 0, bytes.Length, WriteCallback, null);
}
private void WriteCallback(IAsyncResult result)
{
NetworkStream networkStream = tcpClient.GetStream();
if (tcpClient.Connected)
{
networkStream.EndWrite(result);
}
if (dataWritten != null)
dataWritten(ID);
}
private void ConnectCallback(IAsyncResult result)
{
// Check to see if connected successfully or not. If we didnt, then the try/catch block will increment
// the failed connection count.
try
{
tcpClient.EndConnect(result);
}
catch
{
Interlocked.Increment(ref failedConnectionCount);
if (keepOnTrying)
tcpClient.BeginConnect(address, port, ConnectCallback, null);
if (clientConnectionFailed != null)
clientConnectionFailed(ID);
return;
}
// Connected successfully.
// Now begin async read operation.
NetworkStream networkStream = tcpClient.GetStream();
byte[] buffer = new byte[tcpClient.ReceiveBufferSize];
networkStream.BeginRead(buffer, 0, buffer.Length, ReadCallback, buffer);
if (clientConnected != null)
clientConnected(ID);
}
private void ReadCallback(IAsyncResult result)
{
int read;
NetworkStream networkStream;
try
{
networkStream = tcpClient.GetStream();
read = networkStream.EndRead(result);
}
catch
{
// An error has occured when reading.. -.-
Console.WriteLine("Error occured while reading for ID: " + ID);
return;
}
// If read is 0, then connection was closed
if (read == 0)
{
if (connectionClosed != null)
connectionClosed(ID);
return;
}
if (result.IsCompleted == false)
{
Console.WriteLine("Uh oh ");
}
byte[] buffer = result.AsyncState as byte[];
if (incomingData != null)
incomingData(buffer, ID);
// Then begin reading again.
networkStream.BeginRead(buffer, 0, buffer.Length, ReadCallback, buffer);
}
}
class ModuleClient
{
private List<ModuleClientInfo> clients = new List<ModuleClientInfo>();
private Server myServer = null;
public ModuleClient()
{
// create a server object
myServer = new Server("127.0.0.1", 9000);
// Attach callbacks
myServer.connectionClosed = connClosed;
myServer.dataWritten = dataWritten;
myServer.incomingConnection = incomingConn;
myServer.incomingData = incomingData;
}
public void startListeningForModules()
{
if (!listeningForModules)
myServer.Start();
else
return;
listeningForModules = true;
}
private void incomingData(byte[] buffer, string clientID, TcpClient tcpClient)
{
Console.WriteLine("Incoming Data from " + clientID);
incomingPacketStruct newPacket = new incomingPacketStruct();
newPacket.clientID = clientID;
newPacket.buffer = buffer;
newPacket.tcpClient = tcpClient;
}
public partial class Program : ServiceBase
{
// Globals
private static SocketObject.Client myHermesClient = null;
private static JSONInterface myJsonInterface = null;
private static void mainThread(object data)
{
// Take care of client and callbacks..
myHermesClient = new SocketObject.Client(System.Net.IPAddress.Parse("127.0.0.1"), 9000);
myHermesClient.connectionClosed = hermesConnectionClosed;
myHermesClient.clientConnected = hermesConnected;
myHermesClient.dataWritten = hermesDataWritten;
myHermesClient.incomingData = hermesIncomingData;
myHermesClient.clientConnectionFailed = hermesConnectionFailed;
myHermesClient.keepOnTrying = true;
// Begin async connect
myHermesClient.Connect();
// Main loop for service.
while (serviceRunning)
{
Thread.Sleep(500);
}
}
#region Hermes Client Code
private static void hermesIncomingData(byte[] buffer, string serverID)
{
}
最佳答案
内private void ReadCallback(IAsyncResult result)
ReadHandle(client, read, networkStream);
ReadHandle()
您再次设置回拨。
关于c# - .NET 中的异步读取问题 - 似乎我的回调在不应该被调用时被多次调用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15170738/
我遇到以下问题。我想读取一个包含数百万行和数百列的大型 csv。我想向下转换列的数据类型。我的方法是读取 csv,然后使用 pd.to_numeric() 对其进行向下转换。我不知道列数及其类型。在读
目前,我从 SQL server (2008) 数据库获取数据。 cyurrent的方法是使用DataTable,然后将其传递并使用。 if (parameters != null)
我有以下问题。我有一个巨大的 csv 文件,想用多处理加载它。对于一个包含 500000 行和 130 列不同数据类型的示例文件,Pandas 需要 19 秒。我试过 dask 因为我想多处理阅读。但
是否有关于用于序列化各种 MFC 数据结构的二进制格式的明确文档?我已经能够在十六进制编辑器中查看我自己的一些类,并使用 Java 的 ByteBuffer 类读取它们(使用自动字节顺序转换等)。 但
我正在使用 Selenium 进行测试,我们用 HTML 文件编写测试用例,并用它们制作测试套件,我们的要求是编写足够健壮的测试用例,以根据测试环境改变自身。 为此,我不希望在 HTML 脚本本身中包
我需要一个 JavaScript 代码来读取存储为 .txt 文件的字典(或者也可以保存为任何其他类型的文件。它也可以在线获得)并将其内容存储在一个变量中。我不能找到一种让 JavaScript 像
我正在尝试遍历包含 SSH 登录和其他日志的日志文本文件。 程序正在返回 SSH 登录的总数。 我的解决方案确实有效,但似乎有点慢(在 200mo 文件上大约需要 3.5 秒)。我想知道是否有任何方法
我正在将大量数据从一个电子表格复制到工作簿中的其他 160 个电子表格。目前,Excel (2013) 遇到错误,因为它没有足够的资源来完成操作。 我的目标是将工作表 4 中 V13:XI1150 范
我正在尝试读取一个有 1147 行的文本文件。下面的代码仅读取第 1050-1147 行。我的目标是读取整个文件并提取位于不同行的特定值以在脚本中使用。一个示例是包含“BlockList: 2”的行中
我正在为游戏编写解释器。用户将其移动输入解释器,程序执行该移动。 现在我想为每个决定实现一个时间限制。玩家不应该能够思考超过 30 秒来写一个移动并按下回车。 call_with_time_limit
以this file例如,我正在尝试读取 data.frame 中的数据。来自 the doc (pdf 文件,表 1),它遵循一些 fortran 约定。我尝试了以下但收效甚微: dir 0' 将
我正在使用 R 阅读 Outlook 附件。我的引用在这里:Download attachment from an outlook email using R 这是我的电子邮件的截图: 这每天都会发送
我不会从表格中读取行来将主题放在列表中 php脚本 $url_obj='http://'.$host.':8069/xmlrpc/object'; $sock=new xmlrpc_client($u
我有一个这样的 csv 文件: id,name,value 1,peter,5 2,peter\,paul,3 我如何读取此文件并告诉 R "\," 不表示新列,仅表示 ","。 我必须添加该文件
我正在尝试读取 ~/Library/Preferences/com.apple.mail.plist (在 Snow Leopard 上)以获取电子邮件地址和其他信息以进入“关于”对话框。我使用以下代
This question already has answers here: How do I use floating-point division in bash? (19个回答) 5个月前关闭
本练习的目标是读取输入文件并将其存储到表中,然后验证输入中的某些字段并输出任何错误记录。我需要读取并存储每个策略组,以便表中一次仅存储 5 条记录,而不是整个文件。 所以我需要读取一个包含 5 条记录
据我了解,LWT 插入始终以 SERIAL 一致性级别完成。如果为 true,这是否意味着读取作为 LWT 插入的行可以安全地以 ANY 的一致性级别读取? 换句话说,我假设 LWT 插入是完全一致的
我看到很多很多通过java脚本读取cookie的函数,但我只想在变量中使用它一次,我是JS新手。 这是我的代码 var TheNumber = (Math.random() + '') * 10000
我正在使用 asp.net 和 C#。我在服务器上部署了一个应用程序[已发布],现在我想查看该网站的代码,据我所知,我可以阅读程序集来查看代码。 请告诉我如何实现它。 提前致谢。 最佳答案 您可以使用
我是一名优秀的程序员,十分优秀!