- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我目前正在开展一个项目,我们面临着并行处理项目的挑战。到目前为止没什么大不了的;)现在问题来了。我们有一个 ID 列表,我们定期(每 2 秒)为每个 ID 调用一个 StoredProcedure。需要单独检查每个项目的 2 秒,因为它们是在运行时添加和删除的。此外,我们要配置最大并行度,因为数据库不应该同时被 300 个线程淹没。正在处理的项目在完成上一次执行之前不应重新安排处理。原因是我们要防止排队很多项目,以防数据库出现延迟。
现在我们使用的是一个自主开发的组件,它有一个主线程,它会定期检查哪些项目需要安排处理。获得列表后,它会将这些列表放到基于 IOCP 的自定义线程池中,然后使用 waithandles 等待正在处理的项目。然后下一次迭代开始。 IOCP 因为它提供了工作窃取。
我想用 TPL/.NET 4 版本替换此自定义实现,并且我想知道您将如何解决它(最好是简单且易于阅读/维护)。我知道这篇文章:http://msdn.microsoft.com/en-us/library/ee789351.aspx ,但这只是限制了正在使用的线程数量。离开偷窃工作,定期执行项目....
理想情况下,它将成为一个通用组件,可用于一些需要定期为项目列表完成的所有任务。
欢迎任何输入,蒂亚马丁
最佳答案
我认为您实际上不需要对直接 TPL 感到沮丧 Tasks
为了这。对于初学者,我会设置一个 BlockingCollection
围绕 ConcurrentQueue
(默认)没有 BoundedCapacity
在 BlockingCollection
上设置以存储需要处理的 ID。
// Setup the blocking collection somewhere when your process starts up (OnStart for a Windows service)
BlockingCollection<string> idsToProcess = new BlockingCollection<string>();
从那里我将只使用 Parallel::ForEach
关于从 BlockingCollection::GetConsumingEnumerable
返回的枚举.在 ForEach
调用中,您将设置您的 ParallelOptions::MaxDegreeOfParallelism
在 ForEach
的主体内,您将执行存储过程。
现在,一旦存储过程执行完成,您就是说您不想重新安排执行至少 两秒钟。没问题,安排一个System.Threading.Timer
带有一个回调,该回调将简单地将 ID 添加回提供的回调中的 BlockingCollection
。
Parallel.ForEach(
idsToProcess.GetConsumingEnumerable(),
new ParallelOptions
{
MaxDegreeOfParallelism = 4 // read this from config
},
(id) =>
{
// ... execute sproc ...
// Need to declare/assign this before the delegate so that we can dispose of it inside
Timer timer = null;
timer = new Timer(
_ =>
{
// Add the id back to the collection so it will be processed again
idsToProcess.Add(id);
// Cleanup the timer
timer.Dispose();
},
null, // no state, id wee need is "captured" in the anonymous delegate
2000, // probably should read this from config
Timeout.Infinite);
}
最后,当进程关闭时,您将调用 BlockingCollection::CompleteAdding
这样正在处理的可枚举停止阻塞并完成并且 Parallel::ForEach 将退出。例如,如果这是 Windows 服务,您将在 OnStop
中执行此操作.
// When ready to shutdown you just signal you're done adding
idsToProcess.CompleteAdding();
更新
您在评论中提出了一个合理的担忧,即您可能在任何给定点处理大量 ID,并且担心每个 ID 的计时器会产生过多的开销。我绝对同意这一点。因此,在您同时处理大量 ID 的情况下,我会从使用每个 ID 计时器更改为使用另一个队列来保存由单个短间隔计时器监视的“ sleep ”ID。首先,您需要一个 ConcurrentQueue
来放置休眠的 ID:
ConcurrentQueue<Tuple<string, DateTime>> sleepingIds = new ConcurrentQueue<Tuple<string, DateTime>>();
现在,我使用由两部分组成的 Tuple
此处用于说明目的,但您可能希望为其创建一个更强类型的结构(或至少使用 using
语句为其创建别名)以提高可读性。该元组具有 id 和一个 DateTime,表示它何时被放入队列。
现在您还需要设置监控此队列的计时器:
Timer wakeSleepingIdsTimer = new Timer(
_ =>
{
DateTime utcNow = DateTime.UtcNow;
// Pull all items from the sleeping queue that have been there for at least 2 seconds
foreach(string id in sleepingIds.TakeWhile(entry => (utcNow - entry.Item2).TotalSeconds >= 2))
{
// Add this id back to the processing queue
idsToProcess.Enqueue(id);
}
},
null, // no state
Timeout.Infinite, // no due time
100 // wake up every 100ms, probably should read this from config
);
那么您只需更改 Parallel::ForEach
即可执行以下操作,而不是为每个设置一个计时器:
(id) =>
{
// ... execute sproc ...
sleepingIds.Enqueue(Tuple.Create(id, DateTime.UtcNow));
}
关于c# - TPL架构问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/6308225/
是否可以简化在裸机上运行的这条链: 具有随时间变化的副本数的 StatefulSet 服务 使用 proxy-next-upstream: "error http_502 timeout invali
很难说出这里要问什么。这个问题模棱两可、含糊不清、不完整、过于宽泛或夸夸其谈,无法以目前的形式得到合理的回答。如需帮助澄清此问题以便重新打开,visit the help center . 关闭 1
我需要为应用程序制定架构。它专为销售产品而设计。 系统每天将接受大约 30-40k 的新产品。它将导致在表 product 中创建新记录。 系统应保留价格历史记录。用户应该能够看到产品 A 的价格在去
我需要一些帮助来理解 PHP 的内部工作原理。 还记得,在过去,我们曾经写过 TSR(Terminate and stay resident)例程(pre-windows 时代)吗?一旦该程序被执行,
1.Nginx 基础架构 nginx 启动后以 daemon 形式在后台运行,后台进程包含一个 master 进程和多个 worker 进程。如下图所示: master与
本文深入探讨了Kubernetes(K8s)的关键方面,包括其架构、容器编排、网络与存储管理、安全与合规、高可用性、灾难恢复以及监控与日志系统。 关注【TechLeadCloud】,
我知道 CNN 的工作原理,包括每一层的用途(Dropout、Pooling 等)。但是,在为新数据集设计 CNN 时,我不知道要使用多少个 Conv-Relu-Pool 层,在最终获得输出之前我应该
在基于 REST 的架构中,资源和方法之间有什么区别。有吗? 最佳答案 资源是您的应用程序定义的东西;它们与物体非常相似。方法是 HTTP 动词之一,例如 GET、POST、PUT、DELETE。它们
我想用 oneOf仅在 xyType 的值上不同的模式属性(property)。我想要其中两个:一个是 xyType设置为 "1"第二个在哪里xyType是 任何其他值 .这可以使用 json 模式完
寻求 PHP 架构师的建议! 我对 PHP 不是很熟悉,但已经接管了一个用该语言编写的大型分析包的维护工作。该架构旨在将报告的数据读取到大型键/值数组中,这些数组通过各种解析模块传递,以提取每个模块已
这些存在吗? 多年来,我一直是大型强类型面向对象语言(Java 和 C#)的奴隶,并且是 Martin Fowler 及其同类的信徒。 Javascript,由于它的松散类型和函数性质,似乎不适合我习
我已经阅读了 Manning 的 Big Data Lambda Architecture ( http://www.manning.com/marz/BD_meap_ch01.pdf ),但仍然无法
在过去的几年里,我做了相当多的 iOS 开发,所以我非常熟悉 iOS 架构和应用程序设计(一切都是一个 ViewController,您可以将其推送、弹出或粘贴到选项卡栏中)。我最近开始探索正确的 M
我有以下应用程序,我在其中循环一些数据并显示它。 {{thing.title}} {{thing.description}}
昨天我和我的伙伴讨论了我正在开发的这个电子购物网站的架构。请注意,我为此使用 ASP.NET。他非常惊讶地发现我没有将添加到购物车的项目保留在 ArrayList 或其他通用列表中,而是使用 LINQ
我正在使用在 tridion 蓝图层次结构中处于较低位置的出版物。从蓝图中较高级别的出版物继承的一些内容和模式不适合我的出版物,并且永远不会被我的出版物使用。 我将跟进添加这些项目的内部团队,并尝试说
我目前已经在 Cassandra 中设计了一个架构,但我想知道是否有更好的方法来做事情。基本上,问题在于大多数(如果不是全部)读取都是动态的。我构建了一个分段系统作为应用程序服务,读取动态自定义查询(
我正在按照 documentation 中给出的 icingaweb UI v 2.0 布局执行在服务器上设置 icinga 的步骤。 。我成功进入设置页面,该页面要求您输入 token ,然后按照步
我必须保存来自不同社交媒体的用户的不同个人资料。例如用户可能有 1 个 Facebook 和 2 个 Twitter 个人资料。如果我保存每个配置文件它作为新文档插入不同的集合中,例如 faceboo
我的团队使用 Puppet 架构,该架构目前可在多个环境(流浪者、暂存、生产)中容纳单个应用程序。 我们现在想要扩展此设置的范围以支持其他应用程序。他们中的许多人将使用我们已经定义的现有模块的子集,而
我是一名优秀的程序员,十分优秀!