gpt4 book ai didi

c# - CancellationToken请求取消时NamedPipeServerStream.ReadAsync()不退出

转载 作者:太空狗 更新时间:2023-10-29 20:09:18 29 4
gpt4 key购买 nike

当 NamedPipeServer 流从管道中读取任何数据时,它不会对 CancellationTokenSource.Cancel() 使用react

这是为什么?

如何限制我在服务器中等待来自客户端的数据的时间?

重现代码:

static void Main(string[] args)
{
Server();
Clinet();
Console.WriteLine("press [enter] to exit");
Console.ReadLine();
}

private static async Task Server()
{
using (var cancellationTokenSource = new CancellationTokenSource(1000))
using (var server = new NamedPipeServerStream("test",
PipeDirection.InOut,
1,
PipeTransmissionMode.Byte,
PipeOptions.Asynchronous))
{
var cancellationToken = cancellationTokenSource.Token;
await server.WaitForConnectionAsync(cancellationToken);
await server.WriteAsync(new byte[]{1,2,3,4}, 0, 4, cancellationToken);
var buffer = new byte[4];
await server.ReadAsync(buffer, 0, 4, cancellationToken);
Console.WriteLine("exit server");
}
}

private static async Task Clinet()
{
using (var client = new NamedPipeClientStream(".", "test", PipeDirection.InOut, PipeOptions.Asynchronous))
{
var buffer = new byte[4];
client.Connect();
client.Read(buffer, 0, 4);
await Task.Delay(5000);
await client.WriteAsync(new byte[] {1, 2, 3, 4}, 0, 4);
Console.WriteLine("client exit");
}
}

预期结果:

exit server
<client throws exception cuz server closed pipe>

实际结果:

client exit
exit server

编辑

CancelIo 的答案似乎很有希望,它确实允许服务器在取消 token 被取消时结束通信。但是,我不明白为什么我的“基本方案”在使用 ReadPipeAsync 时停止工作。

代码如下,包含两个客户端函数:

  1. Clinet_ShouldWorkFine - 一个及时读/写的好客户端
  2. Clinet_ServerShouldEndCommunication_CuzClientIsSlow - 客户端太慢,服务器应该结束通信

预期:

  1. Clinet_ShouldWorkFine - 执行结束,没有任何异常
  2. Clinet_ServerShouldEndCommunication_CuzClientIsSlow - 服务器关闭管道,客户端抛出异常

实际:

  1. Clinet_ShouldWorkFine - 服务器在第一次调用 ReadPipeAsync 时停止,管道在 1 秒后关闭,客户端抛出异常
  2. Clinet_ServerShouldEndCommunication_CuzClientIsSlow - 服务器关闭管道,客户端抛出异常

为什么 Clinet_ShouldWorkFine 在服务器使用 ReadPipeAsync 时不起作用

class Program
{
static void Main(string[] args) {
// in this case server should close the pipe cuz client is too slow
try {
var tasks = new Task[3];
tasks[0] = Server();
tasks[1] = tasks[0].ContinueWith(c => {
Console.WriteLine($"Server exited, cancelled={c.IsCanceled}");
});
tasks[2] = Clinet_ServerShouldEndCommunication_CuzClientIsSlow();
Task.WhenAll(tasks).Wait();
}
catch (Exception ex) {
Console.WriteLine(ex);
}

// in this case server should exchange data with client fine
try {
var tasks = new Task[3];
tasks[0] = Server();
tasks[1] = tasks[0].ContinueWith(c => {
Console.WriteLine($"Server exited, cancelled={c.IsCanceled}");
});
tasks[2] = Clinet_ShouldWorkFine();
Task.WhenAll(tasks).Wait();
}
catch (Exception ex) {
Console.WriteLine(ex);
}

Console.WriteLine("press [enter] to exit");
Console.ReadLine();
}

private static async Task Server()
{
using (var cancellationTokenSource = new CancellationTokenSource(1000))
using (var server = new NamedPipeServerStream("test",
PipeDirection.InOut,
1,
PipeTransmissionMode.Byte,
PipeOptions.Asynchronous))
{
var cancellationToken = cancellationTokenSource.Token;
await server.WaitForConnectionAsync(cancellationToken);
await server.WriteAsync(new byte[]{1,2,3,4}, 0, 4, cancellationToken);
await server.WriteAsync(new byte[]{1,2,3,4}, 0, 4, cancellationToken);
var buffer = new byte[4];
var bytes = await server.ReadPipeAsync(buffer, 0, 4, cancellationToken);
var bytes2 = await server.ReadPipeAsync(buffer, 0, 4, cancellationToken);
Console.WriteLine("exit server");
}
}

private static async Task Clinet_ShouldWorkFine()
{
using (var client = new NamedPipeClientStream(".", "test", PipeDirection.InOut, PipeOptions.Asynchronous))
{
var buffer = new byte[4];
client.Connect();
client.Read(buffer, 0, 4);
client.Read(buffer, 0, 4);
await client.WriteAsync(new byte[] {1, 2, 3, 4}, 0, 4);
await client.WriteAsync(new byte[] {1, 2, 3, 4}, 0, 4);
Console.WriteLine("client exit");
}
}

private static async Task Clinet_ServerShouldEndCommunication_CuzClientIsSlow()
{
using (var client = new NamedPipeClientStream(".", "test", PipeDirection.InOut, PipeOptions.Asynchronous))
{
var buffer = new byte[4];
client.Connect();
client.Read(buffer, 0, 4);
client.Read(buffer, 0, 4);
await Task.Delay(5000);
await client.WriteAsync(new byte[] {1, 2, 3, 4}, 0, 4);
await client.WriteAsync(new byte[] {1, 2, 3, 4}, 0, 4);
Console.WriteLine("client exit");
}
}
}

public static class AsyncPipeFixer {

public static Task<int> ReadPipeAsync(this PipeStream pipe, byte[] buffer, int offset, int count, CancellationToken cancellationToken) {
if (cancellationToken.IsCancellationRequested) return Task.FromCanceled<int>(cancellationToken);
var registration = cancellationToken.Register(() => CancelPipeIo(pipe));
var async = pipe.BeginRead(buffer, offset, count, null, null);
return new Task<int>(() => {
try { return pipe.EndRead(async); }
finally { registration.Dispose(); }
}, cancellationToken);
}

private static void CancelPipeIo(PipeStream pipe) {
// Note: no PipeStream.IsDisposed, we'll have to swallow
try {
CancelIo(pipe.SafePipeHandle);
}
catch (ObjectDisposedException) { }
}
[DllImport("kernel32.dll")]
private static extern bool CancelIo(SafePipeHandle handle);

}

最佳答案

.NET 程序员在编写像这样的小测试程序时会遇到 async/await 的可怕麻烦。它的构图很差,一路上都是乌龟。这个程序缺少最后的乌龟,任务陷入僵局。没有人负责让任务继续执行,这通常发生在(比如)GUI 应用程序中。也极难调试。

首先做一个小改动,让死锁完全可见:

   int bytes = await server.ReadPipeAsync(buffer, 0, 4, cancellationTokenSource.Token);

这消除了一个令人讨厌的小角落案例,服务器方法一直到“服务器退出”消息。 Task 类的一个长期问题是,当任务完成或等待的方法同步完成时,它将尝试直接运行延续。这恰好适用于该程序。通过强制它获得异步结果,死锁现在很明显。


下一步是修复 Main(),这样这些任务就不会再死锁了。这可能看起来像这样:

static void Main(string[] args) {
try {
var tasks = new Task[3];
tasks[0] = Server();
tasks[1] = tasks[0].ContinueWith(c => {
Console.WriteLine($"Server exited, cancelled={c.IsCanceled}");
});
tasks[2] = Clinet();
Task.WhenAll(tasks).Wait();
}
catch (Exception ex) {
Console.WriteLine(ex);
}
Console.WriteLine("press [enter] to exit");
Console.ReadLine();
}

现在我们有机会取得进展并实际解决取消问题。 NamedPipeServerStream 类本身不实现 ReadAsync,它从其基类之一 Stream 继承该方法。它有一个破烂的小细节,完全没有记录。只有盯着 framework source code 才能看到.它只能在您调用 ReadAsync() 之前发生取消时检测取消。一旦读取开始,它就不再能看到取消。您要解决的最终问题。

这是一个可以解决的问题,我只是一个模糊的想法,为什么微软没有为 PipeStreams 这样做。强制 BeginRead() 方法提前完成的正常方法是 Dispose() 对象,这也是可以中断 Stream.ReadAsync() 的唯一方法。但还有另一种方法,在 Windows 上可以使用 CancelIo() 中断 I/O 操作。 .让我们把它变成一个扩展方法:

using System;
using System.Threading.Tasks;
using System.Runtime.InteropServices;
using System.IO.Pipes;
using Microsoft.Win32.SafeHandles;

public static class AsyncPipeFixer {

public static Task<int> ReadPipeAsync(this PipeStream pipe, byte[] buffer, int offset, int count, CancellationToken cancellationToken) {
if (cancellationToken.IsCancellationRequested) return Task.FromCanceled<int>(cancellationToken);
var registration = cancellationToken.Register(() => CancelPipeIo(pipe));
var async = pipe.BeginRead(buffer, offset, count, null, null);
return new Task<int>(() => {
try { return pipe.EndRead(async); }
finally { registration.Dispose(); }
}, cancellationToken);
}

private static void CancelPipeIo(PipeStream pipe) {
// Note: no PipeStream.IsDisposed, we'll have to swallow
try {
CancelIo(pipe.SafePipeHandle);
}
catch (ObjectDisposedException) { }
}
[DllImport("kernel32.dll")]
private static extern bool CancelIo(SafePipeHandle handle);

}

最后调整服务器以使用它:

    int bytes = await server.ReadPipeAsync(buffer, 0, 4, cancellationTokenSource.Token);

请注意,此解决方法特定于 Windows,因此无法在针对 Unix 风格的 .NETCore 程序中工作。然后考虑更重的锤子,在CancelPipeIo()方法中调用pipe.Close()。

关于c# - CancellationToken请求取消时NamedPipeServerStream.ReadAsync()不退出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52632448/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com