gpt4 book ai didi

.net - 在 .NET 4.5 中限制并发任务的数量

转载 作者:行者123 更新时间:2023-12-04 01:33:42 24 4
gpt4 key购买 nike

观察以下函数:

public Task RunInOrderAsync<TTaskSeed>(IEnumerable<TTaskSeed> taskSeedGenerator,
CreateTaskDelegate<TTaskSeed> createTask,
OnTaskErrorDelegate<TTaskSeed> onError = null,
OnTaskSuccessDelegate<TTaskSeed> onSuccess = null) where TTaskSeed : class
{
Action<Exception, TTaskSeed> onFailed = (exc, taskSeed) =>
{
if (onError != null)
{
onError(exc, taskSeed);
}
};

Action<Task> onDone = t =>
{
var taskSeed = (TTaskSeed)t.AsyncState;
if (t.Exception != null)
{
onFailed(t.Exception, taskSeed);
}
else if (onSuccess != null)
{
onSuccess(t, taskSeed);
}
};

var enumerator = taskSeedGenerator.GetEnumerator();
Task task = null;
while (enumerator.MoveNext())
{
if (task == null)
{
try
{
task = createTask(enumerator.Current);
Debug.Assert(ReferenceEquals(task.AsyncState, enumerator.Current));
}
catch (Exception exc)
{
onFailed(exc, enumerator.Current);
}
}
else
{
task = task.ContinueWith((t, taskSeed) =>
{
onDone(t);
var res = createTask((TTaskSeed)taskSeed);
Debug.Assert(ReferenceEquals(res.AsyncState, taskSeed));
return res;
}, enumerator.Current).TaskUnwrap();
}
}

if (task != null)
{
task = task.ContinueWith(onDone);
}

return task;
}

在哪里 TaskUnwrap是标准 Task.Unwrap 的状态保存版本:
public static class Extensions
{
public static Task TaskUnwrap(this Task<Task> task, object state = null)
{
return task.Unwrap().ContinueWith((t, _) =>
{
if (t.Exception != null)
{
throw t.Exception;
}
}, state ?? task.AsyncState);
}
}
RunInOrderAsync方法允许异步运行 N 个任务,但顺序执行 - 一个接一个。实际上,它运行从给定种子创建的任务,并发限制为 1。

让我们假设 createTask 从种子创建的任务委托(delegate)不对应于多个并发任务。

现在,我想输入 maxConcurrencyLevel 参数,因此函数签名如下所示:
Task RunInOrderAsync<TTaskSeed>(int maxConcurrencyLevel,
IEnumerable<TTaskSeed> taskSeedGenerator,
CreateTaskDelegate<TTaskSeed> createTask,
OnTaskErrorDelegate<TTaskSeed> onError = null,
OnTaskSuccessDelegate<TTaskSeed> onSuccess = null) where TTaskSeed : class

在这里我有点卡住了。

SO有这样的问题:
  • System.Threading.Tasks - Limit the number of concurrent Tasks
  • Task based processing with a limit for concurrent task number with .NET 4.5 and c#
  • .Net TPL: Limited Concurrency Level Task scheduler with task priority?

  • 其中基本上提出了两种解决问题的方法:
  • 使用 Parallel.ForEachParallelOptions指定 MaxDegreeOfParallelism属性值等于所需的最大并发级别。
  • 使用自定义 TaskScheduler与所需的 MaximumConcurrencyLevel值(value)。

  • 第二种方法并没有减少它,因为涉及的所有任务都必须使用相同的任务调度程序实例。为此,所有用于返回 Task 的方法必须有一个接受自定义 TaskScheduler 的重载实例。不幸的是,微软在这方面并不是很一致。例如, SqlConnection.OpenAsync不接受这样的论点(但 TaskFactory.FromAsync 接受)。

    第一种方法意味着我必须将任务转换为操作,如下所示:
    () => t.Wait()

    我不确定这是一个好主意,但我很乐意就此获得更多意见。

    另一种方法是利用 TaskFactory.ContinueWhenAny ,但这很困惑。

    有任何想法吗?

    编辑 1

    我想澄清想要限制的原因。我们的任务最终对同一个 SQL 服务器执行 SQL 语句。我们想要的是一种限制并发传出 SQL 语句数量的方法。完全有可能从其他代码段同时执行其他 SQL 语句,但这是一个批处理器,可能会淹没服务器。

    现在,请注意,尽管我们谈论的是同一个 SQL 服务器,但同一台服务器上有许多数据库。因此,这并不是要限制到同一个数据库的打开 SQL 连接的数量,因为数据库可能根本不一样。

    这就是为什么像 ThreadPool.SetMaxThreads() 这样的末日解决方案无关紧要。

    现在,关于 SqlConnection.OpenAsync .它之所以异步是有原因的——它可能会往返于服务器,因此可能会受到网络延迟和分布式环境的其他可爱副作用的影响。因此,它与接受 TaskScheduler 的其他异步方法没有什么不同。范围。我倾向于认为不接受一个只是一个错误。

    编辑 2

    我想保留原始函数的异步精神。因此,我希望避免任何明确的阻塞解决方案。

    编辑 3

    感谢 @fsimonazzi's answer我现在有了所需功能的工作实现。这是代码:
            var sem = new SemaphoreSlim(maxConcurrencyLevel);
    var tasks = new List<Task>();

    var enumerator = taskSeedGenerator.GetEnumerator();
    while (enumerator.MoveNext())
    {
    tasks.Add(sem.WaitAsync().ContinueWith((_, taskSeed) =>
    {
    Task task = null;
    try
    {
    task = createTask((TTaskSeed)taskSeed);
    if (task != null)
    {
    Debug.Assert(ReferenceEquals(task.AsyncState, taskSeed));
    task = task.ContinueWith(t =>
    {
    sem.Release();
    onDone(t);
    });
    }
    }
    catch (Exception exc)
    {
    sem.Release();
    onFailed(exc, (TTaskSeed)taskSeed);
    }
    return task;
    }, enumerator.Current).TaskUnwrap());
    }

    return Task.Factory.ContinueWhenAll(tasks.ToArray(), _ => sem.Dispose());

    最佳答案

    这里已经有很多答案了。我想解决您在斯蒂芬斯回答中所做的评论,关于使用 TPL 数据流限制并发的示例。即使很难,您在这个问题的另一个答案中也留下了评论,您不再使用基于任务的方法,因为它可能会帮助其他人。

    使用 ActionBlock<T> 的示例因为这是:

    private static async Task DoStuff<T>(int maxConcurrency, IEnumerable<T> items, Func<T, Task> createTask)
    {
    var ab = new ActionBlock<T>(createTask, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = maxConcurrency });

    foreach (var item in items)
    {
    ab.Post(item);
    }

    ab.Complete();
    await ab.Completion;
    }

    关于 TPL 数据流的更多信息可以在这里找到: https://msdn.microsoft.com/en-us/library/system.threading.tasks.dataflow(v=vs.110).aspx

    关于.net - 在 .NET 4.5 中限制并发任务的数量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20355931/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com