gpt4 book ai didi

c# - 异步使用 NamedPipeServerStream 和 NamedPipeClientStream

转载 作者:行者123 更新时间:2023-12-02 11:00:43 25 4
gpt4 key购买 nike

我对服务器/客户端架构有以下要求:

  1. 编写异步工作的服务器/客户端。

  2. 通信需要是双工的,即两端都可以读取和写入。

  3. 多个客户端可以在任何给定时间连接到服务器。

  4. 服务器/客户端应等待,直到它们可用并最终建立连接。

  5. 客户端连接后,它应该写入流。

  6. 然后服务器应该从流中读取并将响应写回客户端。

  7. 最后,客户端应读取响应,通信应结束。

因此,考虑到以下要求,我编写了以下代码,但我不太确定它,因为管道的文档有些缺乏,不幸的是,代码似乎无法正常工作,它卡在某个点。

namespace PipesAsyncAwait471
{
using System;
using System.Collections.Generic;
using System.IO.Pipes;
using System.Linq;
using System.Threading.Tasks;

internal class Program
{
private static async Task Main()
{
List<Task> tasks = new List<Task> {
HandleRequestAsync(),
};

tasks.AddRange(Enumerable.Range(0, 10).Select(i => SendRequestAsync(i, 0, 5)));

await Task.WhenAll(tasks);
}

private static async Task HandleRequestAsync()
{
using (NamedPipeServerStream server = new NamedPipeServerStream("MyPipe",
PipeDirection.InOut,
NamedPipeServerStream.MaxAllowedServerInstances,
PipeTransmissionMode.Message,
PipeOptions.Asynchronous))
{
Console.WriteLine("Waiting...");

await server.WaitForConnectionAsync().ConfigureAwait(false);

if (server.IsConnected)
{
Console.WriteLine("Connected");

if (server.CanRead) {
// Read something...
}

if (server.CanWrite) {
// Write something...

await server.FlushAsync().ConfigureAwait(false);

server.WaitForPipeDrain();
}

server.Disconnect();

await HandleRequestAsync().ConfigureAwait(false);
}
}
}

private static async Task SendRequestAsync(int index, int counter, int max)
{
using (NamedPipeClientStream client = new NamedPipeClientStream(".", "MyPipe", PipeDirection.InOut, PipeOptions.Asynchronous))
{
await client.ConnectAsync().ConfigureAwait(false);

if (client.IsConnected)
{
Console.WriteLine($"Index: {index} Counter: {counter}");

if (client.CanWrite) {
// Write something...

await client.FlushAsync().ConfigureAwait(false);

client.WaitForPipeDrain();
}

if (client.CanRead) {
// Read something...
}
}

if (counter <= max) {
await SendRequestAsync(index, ++counter, max).ConfigureAwait(false);
}
else {
Console.WriteLine($"{index} Done!");
}
}
}
}
}

假设:

我期望它的工作方式是当我调用 SendRequestAsync 时发出的所有请求并发执行,每个请求然后发出额外的请求,直到达到 6 并且最后,它应该打印“完成!”。

备注:

  1. 我在 .NET Framework 4.7.1 和 .NET Core 2.0 上对其进行了测试,得到了相同的结果。

  2. 客户端和服务器之间的通信始终位于计算机本地,其中客户端是可以对某些作业进行排队的 Web 应用程序,例如启动第 3 方进程和服务器将作为 Windows 服务部署在与部署这些客户端的 Web 服务器相同的计算机上。

最佳答案

这是经过一些迭代后的完整代码:

PipeServer.cs:

namespace AsyncPipes;

using System.Diagnostics.CodeAnalysis;
using System.IO.Pipes;

public static class PipeServer
{
public static void WaitForConnection()
=> WaitForConnectionInitializer();

private static void WaitForConnectionInitializer()
{
var context = new ServerContext();
var server = context.Server;

try
{
Console.WriteLine($"Waiting a client...");

server.BeginWaitForConnection(WaitForConnectionCallback, context);
}
catch
{
// We need to cleanup here only when something goes wrong.
context.Dispose();

throw;
}

static void WaitForConnectionCallback(IAsyncResult result)
{
var (context, server, _) = ServerContext.FromResult(result);

server.EndWaitForConnection(result);

WaitForConnectionInitializer();

BeginRead(context);
}

static void BeginRead(ServerContext context)
{
var (_, server, requestBuffer) = context;

server.BeginRead(requestBuffer, 0, requestBuffer.Length, ReadCallback, context);
}

static void BeginWrite(ServerContext context)
{
var (_, server, responseBuffer) = context;

server.BeginWrite(responseBuffer, 0, responseBuffer.Length, WriteCallback, context);
}

static void ReadCallback(IAsyncResult result)
{
var (context, server, requestBuffer) = ServerContext.FromResult(result);

var bytesRead = server.EndRead(result);

if (bytesRead > 0)
{
if (!server.IsMessageComplete)
{
BeginRead(context);
}
else
{
var index = BitConverter.ToInt32(requestBuffer, 0);
Console.WriteLine($"{index} Request.");

BeginWrite(context);
}
}
}

static void WriteCallback(IAsyncResult result)
{
var (context, server, responseBuffer) = ServerContext.FromResult(result);
var index = -1;

try
{
server.EndWrite(result);
server.WaitForPipeDrain();

index = BitConverter.ToInt32(responseBuffer, 0);
Console.WriteLine($"{index} Pong.");
}
finally
{
context.Dispose();
Console.WriteLine($"{index} Disposed.");
}
}
}

private sealed class ServerContext : IDisposable
{
[NotNull]
public byte[]? Buffer { get; private set; } = new byte[4];

[NotNull]
public NamedPipeServerStream? Server { get; private set; } = new ("PipesDemo",
PipeDirection.InOut,
NamedPipeServerStream.MaxAllowedServerInstances,
PipeTransmissionMode.Message,
PipeOptions.Asynchronous);

public void Deconstruct(out ServerContext context, out NamedPipeServerStream server, out byte[] buffer)
=> (context, server, buffer) = (this, Server, Buffer);

public static ServerContext FromResult(IAsyncResult result)
{
ArgumentNullException.ThrowIfNull(result.AsyncState);

return (ServerContext)result.AsyncState;
}

public void Dispose()
{
if (Server is not null)
{
if (Server.IsConnected)
{
Server.Disconnect();
}

Server.Dispose();
}

Server = null;
Buffer = null;
}
}
}

PipeClient:

public static class PipeClient
{
public static void CreateConnection(int index)
{
using var client = new NamedPipeClientStream(".", "PipesDemo", PipeDirection.InOut, PipeOptions.None);
client.Connect();

var requestBuffer = BitConverter.GetBytes(index);
client.Write(requestBuffer, 0, requestBuffer.Length);
client.Flush();
client.WaitForPipeDrain();
Console.WriteLine($"{index} Ping.");

var responseBuffer = new byte[4];
var bytesRead = client.Read(responseBuffer, 0, responseBuffer.Length);

while (bytesRead > 0)
{
bytesRead = client.Read(responseBuffer, bytesRead - 1, responseBuffer.Length - bytesRead);
}

index = BitConverter.ToInt32(responseBuffer, 0);
Console.WriteLine($"{index} Response.");
}
}

Program.cs:

namespace AsyncPipes;

internal class Program
{
private const int MaxRequests = 1000;

private static void Main()
{
var tasks = new List<Task>
{
Task.Run(PipeServer.WaitForConnection)
};

tasks.AddRange(Enumerable.Range(0, MaxRequests - 1)
.Select(i => Task.Factory.StartNew(() => PipeClient.CreateConnection(i),
TaskCreationOptions.LongRunning)));

Task.WaitAll(tasks.ToArray());

Console.ReadKey();
}
}

您可以对消息进行排序并观察以下内容:

  1. 连接已正确打开和关闭。

  2. 数据发送和接收正确。

  3. 最后,服务器仍然等待进一步的连接。

更新:

PipeOptions.Asynchronous 更改为 PipeOptions.None,否则看起来它在请求期间挂起,然后才立即处理它们。

PipeOptions.Asynchronous 只是导致执行顺序与 PipeOptions.None 不同,这会在代码中暴露出竞争条件/死锁。例如,如果您使用任务管理器来监视进程的线程计数,您可以看到它的效果...您应该看到它以每秒 1 个线程的速度爬升,直到达到大约 100 个线程(可能是 110 左右),此时您的代码将运行完成。或者如果您在开头添加 ThreadPool.SetMinThreads(200, 200) 。您的代码存在一个问题,如果发生错误的排序(使用异步更容易出现这种情况),您将创建一个循环,直到有足够的线程来运行您的 main 方法已排队的所有并发 ConnectAsyncs 为止。 ,这并不是真正的异步,而是只是创建一个工作项来调用同步 Connect 方法(这是不幸的,这样的问题是我敦促人们不要公开将工作项简单地排队到的异步 API 的原因之一)调用同步方法)。 Source .

修改并简化了示例:

  1. 管道没有真正的异步 Connect 方法,ConnectAsync 在幕后使用 Task.Factory.StartNew,因此您可能只是也可以使用 Connect,然后将调用同步 Connect 版本的方法(在我们的示例中为 SendRequest)传递给 Task.Factory。开始新的

  2. 服务器现在完全异步,据我所知,它可以正常工作。

  3. 修复了所有 BeginXXX/EndXXX 方法。

  4. 删除了不必要的 try/catch block 。

  5. 删除了不必要的消息。

  6. 稍微重构一下代码,使其更具可读性和简洁性。

  7. 删除了服务器的 async/await 版本,因为我重构了代码,并且没有时间更新 async/await 版本,但是使用上面的版本,您可以了解如何做到这一点以及新的 API 更加友好且易于处理。

希望对您有所帮助。

关于c# - 异步使用 NamedPipeServerStream 和 NamedPipeClientStream,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48059410/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com