gpt4 book ai didi

c# - ActionBlock Framework 4 rx 替代方案

转载 作者:太空宇宙 更新时间:2023-11-03 10:33:49 28 4
gpt4 key购买 nike

我对 Framework 4.0 的 ActionBlock 实现很感兴趣,因为 Framework 4.0 似乎不支持 TPL.Dataflow。更具体地说,我对接收 Func 委托(delegate)的构造函数和 MaxDegreeOfParallism = 1 的情况很感兴趣。

我考虑过使用响应式扩展来实现它,但我不确定该怎么做。考虑创建一个 Subject 并在 Post 上调用 OnNext,并使用 SelectMany 和任务 ToObservable 的东西,但我不确定如何处理调度程序。这是我的想法的草稿。

public class ActionBlock<TInput>
{
private readonly TaskCompletionSource<object> mCompletion = new TaskCompletionSource<object>();
private readonly Subject<TInput> mQueue = new Subject<TInput>();

public ActionBlock(Func<TInput, Task> action)
{
var observable =
from item in mQueue
from _ in action(item).ToObservable()
select _;

observable.Subscribe(x => { },
OnComplete);
}

private void OnComplete()
{
mCompletion.SetResult(null);
}

public void Post(TInput input)
{
mQueue.OnNext(input);
}

public Task Completion
{
get
{
return mCompletion.Task;
}
}

public void Complete()
{
mQueue.OnCompleted();
}
}

我想也许可以使用 EventLoopScheduler,但我不确定它是否适合这里,因为这是异步的。

有什么想法吗?

最佳答案

mQueue
.Select(input => Observable.FromAsync(() => action(input))
.Merge(maxDegreeOfParallelism)
.Subscribe(...);

如果确实 maxDegreeOfParallelism 始终为 1,则只需使用 Concat 而不是 Merge:

mQueue
.Select(input => Observable.FromAsync(() => action(input))
.Concat()
.Subscribe(...);

这是有效的,因为 FromAsync 只是创建一个冷的可观察对象,在它被订阅之前不会运行异步操作。然后我们使用 MergemaxConcurrency 参数(或只是 Concat)来限制并发订阅的数量(从而限制运行的异步操作的数量).

编辑:

并且由于您的目标只是拥有一个表示流完成的 Task,因此您可以使用 ToTask 而不是直接订阅。 ToTask 将订阅并返回一个具有最终值的 Task。因为 ToTask 将在 observable 不产生值时抛出,我们将使用 Count 来保证它产生一个值:

// task to mark completion
private readonly Task mCompletion;

// ...

this.mCompletion = mQueue
.Select(input => Observable.FromAsync(() => action(input))
.Concat()
.Count()
.ToTask();

关于c# - ActionBlock Framework 4 rx 替代方案,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28708400/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com