- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我试图找到一种方法来在多个线程中处理队列,动态调整消费者的数量。基本上任务是众所周知的:多个生产者创建消息并将它们提交到队列中,多个消费者处理来自队列的消息。现在,我考虑了使用 System.Collections.Queue.Synchronized、System.Collections.Concurrent.ConcurrentQueue 和 System.Collections.Concurrent.BlockingCollection 等不同组件的不同方法,但我无法决定如何正确地使用最大效率,所以我很高兴通过您的输入收到一些绝妙的想法。
以下是更多详细信息:
这就是想法。现在,我考虑将 ConcurrentQueue 包装到一个类中,该类将封装 Enqueue 方法,并在入队后检查消息数量,并决定是否启动额外的消费者。并且消费者应该在循环中进行检查,以决定是否停止它。我认为您会提出一些更有趣的解决方案。
顺便说一句,理论上我仍然不知道如何处理的情况之一是当最后一条消息正在排队时,同时最后一个消费者已经停止。另一种情况也与停止有关——如果多个消费者同时进行停止检查,他们将被停止。我应该如何处理这些情况?
为了证明我的意思,请考虑以下示例:
class MessageController
{
private BlockingCollection<IMessage> messageQueue = new BlockingCollection<IMessage>();
int amountOfConsumers;
public void Enqueue(IMessage message)
{
messageQueue.Add(message); // point two
if (Math.Floor((double)messageQueue.Count / 100)+1 > amountOfConsumers) // point three
{
Task.Factory.StartNew(() =>
{
IMessage msg;
while ((messageQueue.Count > 0) && (Math.Floor((double)((messageQueue.Count + 50) / 100)) + 1 >= amountOfConsumers)) //point one
{
msg = messageQueue.Take();
//process msg...
}
ConsumerQuit(); // point four
});
Interlocked.Increment(ref amountOfConsumers);
}
}
public void ConsumerQuit()
{
Interlocked.Decrement(ref amountOfConsumers);
}
}
所以现在当我可以指出具体的代码行时,这些就是问题:
ConsumerTask | LastMessageThread
------------------------------------------------------
@point one(messageQueue.Count=0) | @point two
no time | @point three(amountOfConsumers=1)
@point four | ended;
ended; | ended;
ConsumerTask1 | ConsumerTask2| ConsumerTask3 | ConsumerTask4|
------------------------------------------------------------------------------
@point one(.Count=249;amount=4)| no time | no time | @point one |
no time | @point one | processing msg| @point four |
@point four | no time | @point one | ended; |
ended; | @point four | processing msg| ended; |
ended; | ended; | ... | ended; |Here, in case when the last message is already enqueued, we have one consumer task left that has to handle 249 messages alone, however the worst case can be if all them will halt, after the last message, potentialy hundreds of messages will stuck.
最佳答案
看来我终于想出了一个解决方案,但不确定性能如何。请考虑以下代码,任何反馈将不胜感激!我仍然希望看到一些其他的解决方案或想法,即使它们将完全不同并且需要在方法上进行重大改变。这是目标:“一种在多个线程中处理队列,动态调整消费者数量的方法”
class MessageController
{
private BlockingCollection<IMessage> messageQueue = new BlockingCollection<IMessage>();
private ManualResetEvent mre = new ManualResetEvent(true);
private int amountOfConsumers;
object o = new object();
public void Enqueue(IMessage message)
{
messageQueue.Add(message);
mre.WaitOne();
if (Math.Floor((double)messageQueue.Count / 100)+1 > amountOfConsumers)
{
Interlocked.Increment(ref amountOfConsumers);
var task = Task.Factory.StartNew(() =>
{
IMessage msg;
bool repeat = true;
while (repeat)
{
while ((messageQueue.Count > 0) && (Math.Floor((double)((messageQueue.Count + 50) / 100)) + 1 >= amountOfConsumers))
{
msg = messageQueue.Take();
//process msg...
}
lock (o)
{
mre.Reset();
if ((messageQueue.Count == 0) || (Math.Ceiling((double)((messageQueue.Count + 51) / 100)) < amountOfConsumers))
{
ConsumerQuit();
repeat = false;
}
mre.Set();
}
}
});
}
}
public void ConsumerQuit()
{
Interlocked.Decrement(ref amountOfConsumers);
}
}
关于c# - 队列的并行处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15169429/
有没有办法同时运行 2 个不同的代码块。我一直在研究 R 中的并行包,它们似乎都基于在循环中运行相同的函数。我正在寻找一种同时运行不同函数的方法(循环的 1 次迭代)。例如,我想在某个数据对象上创建一
无论如何增加 Parallel.For 启动后的循环次数?示例如下: var start = 0; var end = 5; Parallel.For(start, end, i => { C
我是 Golang 的新手,正在尝试了解并发和并行。我阅读了下面提到的关于并发和并行的文章。我执行了相同的程序。但没有得到相同的(混合字母和字符)输出。首先获取所有字母,然后获取字符。似乎并发不工作,
我正在寻找同时迭代 R 中两个或多个字符向量/列表的方法,例如。有没有办法做这样的事情: foo <- c('a','c','d') bar <- c('aa','cc','dd') for(i in
我对 Raku 很陌生,我对函数式方法有疑问,尤其是 reduce。 我最初有这样的方法: sub standardab{ my $mittel = mittel(@_); my $foo =
我最近花了很多时间来学习实时音频处理的细节,我发现的大多数库/工具都是c / c++代码或脚本/图形语言的形式,并在其中编译了c / c++代码。引擎盖。 使用基于回调的API,与GUI或App中的其
我正在使用 JMeter 进行图像负载测试。我有一个图像名称数组并遍历该数组,我通过 HTTP 请求获取所有图像。 -> loop_over_image - for loop controller
我整个晚上都在困惑这个问题...... makeflags = ['--prefix=/usr','--libdir=/usr/lib'] rootdir='/tmp/project' ps = se
我正在尝试提高计算图像平均值的方法的性能。 为此,我使用了两个 For 语句来迭代所有图像,因此我尝试使用一个 Parallel For 来改进它,但结果并不相同。 我做错了吗?或者是什么导致了差异?
假设您有一个并行 for 循环实现,例如ConcRT parallel_for,将所有工作放在一个 for 循环体内总是最好的吗? 举个例子: for(size_t i = 0; i < size()
我想并行运行一部分代码。目前我正在使用 Parallel.For 如何让10、20或40个线程同时运行 我当前的代码是: Parallel.For(1, total, (ii) =>
我使用 PAY API 进行了 PayPal 自适应并行支付,其中无论用户(买家)购买什么,都假设用户购买了总计 100 美元的商品。在我的自适应并行支付中,有 2 个接收方:Receiver1 和
我正在考虑让玩家加入游戏的高效算法。由于会有大量玩家,因此算法应该是异步的(即可扩展到集群中任意数量的机器)。有细节:想象有一个无向图(每个节点都是一个玩家)。玩家之间的每条边意味着玩家可以参加同一场
我有一个全局变量 volatile i = 0; 和两个线程。每个都执行以下操作: i++; System.out.print(i); 我收到以下组合。 12、21 和 22。 我理解为什么我没有得到
我有以下称为 pgain 的方法,它调用我试图并行化的方法 dist: /***************************************************************
我有一个 ruby 脚本读取一个巨大的表(约 2000 万行),进行一些处理并将其提供给 Solr 用于索引目的。这一直是我们流程中的一大瓶颈。我打算在这里加快速度,我想实现某种并行性。我对 Ru
我正在研究 Golang 并遇到一个问题,我已经研究了几天,我似乎无法理解 go routines 的概念以及它们的使用方式。 基本上我是在尝试生成数百万条随机记录。我有生成随机数据的函数,并将创建一
我希望 for 循环使用 go 例程并行。我尝试使用 channel ,但没有用。我的主要问题是,我想在继续之前等待所有迭代完成。这就是为什么在它不起作用之前简单地编写 go 的原因。我尝试使用 ch
我正在使用 import Control.Concurrent.ParallelIO.Global main = parallel_ (map processI [1..(sdNumber runPa
我正在尝试通过 makePSOCKcluster 连接到另一台计算机: library(parallel) cl ... doTryCatch -> recvData -> makeSOCKm
我是一名优秀的程序员,十分优秀!