gpt4 book ai didi

c# - 使用异步操作的 TPL 数据流

转载 作者:太空狗 更新时间:2023-10-30 01:07:57 26 4
gpt4 key购买 nike

我正在通过移植一些旧的套接字代码来使用 TPL 数据流来试验 TPL 数据流,以使用 TPL 数据流和新的异步功能。尽管 API 感觉坚如磐石,但我的代码最终还是感觉很乱。我想知道我是否遗漏了什么。

我的要求如下:一个套接字类公开:Open、Close、Send 和 Receive 方法。所有返回一个任务,因此是异步的。打开和关闭是原子的。 Send 和 Receive 可以并排工作,尽管两者一次只能处理 1 个命令。

从逻辑上讲,这将我带到下一段内部控制代码:

// exposing an exclusive scheduler for connectivity related tasks and a parallel scheduler where send and receive can work with
private readonly ConcurrentExclusiveSchedulerPair exclusiveConnectionSchedulerPair;
private readonly ActionBlock<Action> connectionBlock;
private readonly ActionBlock<Action> sendBlock;
private readonly ActionBlock<Action> receiveBlock;

// within the constructor:
this.exclusiveConnectionSchedulerPair = new ConcurrentExclusiveSchedulerPair();
this.connectionBlock = new ActionBlock<Action>(action => action(), new ExecutionDataflowBlockOptions() { TaskScheduler = exclusiveConnectionSchedulerPair.ExclusiveScheduler });
this.sendBlock = new ActionBlock<Action>(action => action(), new ExecutionDataflowBlockOptions() { TaskScheduler = exclusiveConnectionSchedulerPair.ConcurrentScheduler });
this.receiveBlock = new ActionBlock<Action>(action => action(), new ExecutionDataflowBlockOptions() { TaskScheduler = exclusiveConnectionSchedulerPair.ConcurrentScheduler });

到目前为止一切顺利。我可以安全地将操作发送到发送和接收 block ,而不必同时担心正在运行的连接相关操作。此外,ActionBlock 确保同步多个发送调用(接收、关闭和打开同上)。

问题在于,没有简单的方法让操作将任务传回给发布者。现在我正在使用 TaskCompletionSource 来传达结果。喜欢:

public Task Send(ArraySegment<byte> buffer, CancellationToken cancellationToken)
{
TaskCompletionSource<object> resultCompletionSource = new TaskCompletionSource<object>();

sendBlock.Post(async () =>
{
if (!tcpClient.Connected)
throw new InvalidOperationException("Cant send when not open");
else
{
await sendStream.WriteAsync(buffer.Array, buffer.Offset, buffer.Count, cancellationToken);
resultCompletionSource.SetResult(null);
}
});

return resultCompletionSource.Task;
}

感觉丑陋和笨拙。我的问题是:有没有一种方法可以使用 TPL 来同步工作流,而无需使用 TaskCompletionSource 在两者之间进行通信?

谢谢!

最佳答案

首先,您根本不需要 TPL 数据流,因为您实际上没有任何数据流。

其次,像这样使用 TaskScheduler 也不是正确的解决方案。 TaskScheduler 调度 code,但是当您 await 某事时,没有代码运行。因此,当 WriteAsync() 执行其异步工作时,Open() 的代码可以运行。

你真正需要的是类似 ReaderWriterLock 的东西,但这适用于 async。框架中没有类似的东西,但您可以使用 Stephen Toub's article Building Async Coordination Primitives, Part 7: AsyncReaderWriterLock 中的代码,这正是您所需要的。这篇文章还更详细地解释了为什么使用 TaskScheduler 是错误的。

使用 AsyncReaderWriterLock,您的代码可能如下所示:

public async Task Send(ArraySegment<byte> buffer, CancellationToken cancellationToken)
{
using (await readerWriterLock.ReaderLockAsync())
{
if (!tcpClient.Connected)
throw new InvalidOperationException("Can't send when not open");

await sendStream.WriteAsync(buffer.Array, buffer.Offset, buffer.Count, cancellationToken);
}
}

关于c# - 使用异步操作的 TPL 数据流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11000847/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com