gpt4 book ai didi

c# - TPL架构问题

转载 作者:太空狗 更新时间:2023-10-29 21:14:34 24 4
gpt4 key购买 nike

我目前正在开展一个项目,我们面临着并行处理项目的挑战。到目前为止没什么大不了的;)现在问题来了。我们有一个 ID 列表,我们定期(每 2 秒)为每个 ID 调用一个 StoredProcedure。需要单独检查每个项目的 2 秒,因为它们是在运行时添加和删除的。此外,我们要配置最大并行度,因为数据库不应该同时被 300 个线程淹没。正在处理的项目在完成上一次执行之前不应重新安排处理。原因是我们要防止排队很多项目,以防数据库出现延迟。

现在我们使用的是一个自主开发的组件,它有一个主线程,它会定期检查哪些项目需要安排处理。获得列表后,它会将这些列表放到基于 IOCP 的自定义线程池中,然后使用 waithandles 等待正在处理的项目。然后下一次迭代开始。 IOCP 因为它提供了工作窃取。

我想用 TPL/.NET 4 版本替换此自定义实现,并且我想知道您将如何解决它(最好是简单且易于阅读/维护)。我知道这篇文章:http://msdn.microsoft.com/en-us/library/ee789351.aspx ,但这只是限制了正在使用的线程数量。离开偷窃工作,定期执行项目....

理想情况下,它将成为一个通用组件,可用于一些需要定期为项目列表完成的所有任务。

欢迎任何输入,蒂亚马丁

最佳答案

我认为您实际上不需要对直接 TPL 感到沮丧 Tasks为了这。对于初学者,我会设置一个 BlockingCollection围绕 ConcurrentQueue (默认)没有 BoundedCapacityBlockingCollection 上设置以存储需要处理的 ID。

// Setup the blocking collection somewhere when your process starts up (OnStart for a Windows service)
BlockingCollection<string> idsToProcess = new BlockingCollection<string>();

从那里我将只使用 Parallel::ForEach关于从 BlockingCollection::GetConsumingEnumerable 返回的枚举.在 ForEach 调用中,您将设置您的 ParallelOptions::MaxDegreeOfParallelismForEach 的主体内,您将执行存储过程。

现在,一旦存储过程执行完成,您就是说您不想重新安排执行至少 两秒钟。没问题,安排一个System.Threading.Timer带有一个回调,该回调将简单地将 ID 添加回提供的回调中的 BlockingCollection

Parallel.ForEach(
idsToProcess.GetConsumingEnumerable(),
new ParallelOptions
{
MaxDegreeOfParallelism = 4 // read this from config
},
(id) =>
{
// ... execute sproc ...

// Need to declare/assign this before the delegate so that we can dispose of it inside
Timer timer = null;

timer = new Timer(
_ =>
{
// Add the id back to the collection so it will be processed again
idsToProcess.Add(id);

// Cleanup the timer
timer.Dispose();
},
null, // no state, id wee need is "captured" in the anonymous delegate
2000, // probably should read this from config
Timeout.Infinite);
}

最后,当进程关闭时,您将调用 BlockingCollection::CompleteAdding这样正在处理的可枚举停止阻塞并完成并且 Parallel::ForEach 将退出。例如,如果这是 Windows 服务,您将在 OnStop 中执行此操作.

// When ready to shutdown you just signal you're done adding
idsToProcess.CompleteAdding();

更新

您在评论中提出了一个合理的担忧,即您可能在任何给定点处理大量 ID,并且担心每个 ID 的计时器会产生过多的开销。我绝对同意这一点。因此,在您同时处理大量 ID 的情况下,我会从使用每个 ID 计时器更改为使用另一个队列来保存由单个短间隔计时器监视的“ sleep ”ID。首先,您需要一个 ConcurrentQueue 来放置休眠的 ID:

ConcurrentQueue<Tuple<string, DateTime>> sleepingIds = new ConcurrentQueue<Tuple<string, DateTime>>();

现在,我使用由两部分组成的 Tuple此处用于说明目的,但您可能希望为其创建一个更强类型的结构(或至少使用 using 语句为其创建别名)以提高可读性。该元组具有 id 和一个 DateTime,表示它何时被放入队列。

现在您还需要设置监控此队列的计时器:

Timer wakeSleepingIdsTimer = new Timer(
_ =>
{
DateTime utcNow = DateTime.UtcNow;

// Pull all items from the sleeping queue that have been there for at least 2 seconds
foreach(string id in sleepingIds.TakeWhile(entry => (utcNow - entry.Item2).TotalSeconds >= 2))
{
// Add this id back to the processing queue
idsToProcess.Enqueue(id);
}
},
null, // no state
Timeout.Infinite, // no due time
100 // wake up every 100ms, probably should read this from config
);

那么您只需更改 Parallel::ForEach 即可执行以下操作,而不是为每个设置一个计时器:

(id) =>
{
// ... execute sproc ...

sleepingIds.Enqueue(Tuple.Create(id, DateTime.UtcNow));
}

关于c# - TPL架构问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/6308225/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com