- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我正在试验 CCR 迭代器作为一项任务的解决方案,该任务需要并行处理大量数据馈送,其中来自每个馈送的数据需要按顺序处理。所有 Feed 都不相互依赖,因此每个 Feed 都可以并行进行按顺序处理。
下面是一个带有一个整数馈送的快速但肮脏的模型,它只是以大约 1.5K/秒的速率将整数插入端口,然后使用 CCR 迭代器将它们拉出以保持按顺序处理保证。
class Program
{
static Dispatcher dispatcher = new Dispatcher();
static DispatcherQueue dispatcherQueue =
new DispatcherQueue("DefaultDispatcherQueue", dispatcher);
static Port<int> intPort = new Port<int>();
static void Main(string[] args)
{
Arbiter.Activate(
dispatcherQueue,
Arbiter.FromIteratorHandler(new IteratorHandler(ProcessInts)));
int counter = 0;
Timer t = new Timer( (x) =>
{ for(int i = 0; i < 1500; ++i) intPort.Post(counter++);}
, null, 0, 1000);
Console.ReadKey();
}
public static IEnumerator<ITask> ProcessInts()
{
while (true)
{
yield return intPort.Receive();
int currentValue;
if( (currentValue = intPort) % 1000 == 0)
{
Console.WriteLine("{0}, Current Items In Queue:{1}",
currentValue, intPort.ItemCount);
}
}
}
}
令我感到非常惊讶的是,CCR 无法跟上 Corei7 机器的速度,队列大小无限制地增长。在另一个测量负载或 ~100 Post/sec 下从 Post() 到 Receive() 的延迟的测试中,每个批处理中第一个 Post() 和 Receive() 之间的延迟约为 1 毫秒。
我的模型有问题吗?如果是这样,使用 CCR 执行此操作的更好方法是什么?
最佳答案
是的,我同意,这确实看起来很奇怪。您的代码最初似乎执行得很顺利,但在几千项之后,处理器使用率上升到性能确实乏善可陈的地步。这让我感到不安,并表明框架中存在问题。在玩了你的代码之后,我真的无法确定为什么会这样。我建议将此问题提交给 Microsoft Robotics Forums看看你能否让 George Chrysanthakopoulos(或其他 CCR 的大脑之一)告诉你问题出在哪里。但是,我可以推测您的代码目前的效率非常低。
您处理从港口“弹出”元素的方式非常低效。本质上,每当端口中有一条消息时,迭代器就会被唤醒,它只处理一条消息(尽管端口中可能有数百条消息),然后卡在 yield
上,同时控制被传递回框架。当 yielded receiver 引起迭代器的另一次“唤醒”时,许多消息已经填满了 Port。从 Dispatcher 拉出一个线程来只处理一个项目(当许多项目同时堆积时)几乎肯定不是获得良好吞吐量的最佳方式。
我修改了您的代码,以便在 yield 之后,我们检查端口以查看是否还有任何其他消息排队并处理它们,从而在我们返回框架之前完全清空端口。我还稍微重构了您的代码以使用 CcrServiceBase
,这简化了您正在执行的某些任务的语法:
internal class Test:CcrServiceBase
{
private readonly Port<int> intPort = new Port<int>();
private Timer timer;
public Test() : base(new DispatcherQueue("DefaultDispatcherQueue",
new Dispatcher(0,
"dispatcher")))
{
}
public void StartTest() {
SpawnIterator(ProcessInts);
var counter = 0;
timer = new Timer(x =>
{
for (var i = 0; i < 1500; ++i)
intPort.Post(counter++);
}
,
null,
0,
1000);
}
public IEnumerator<ITask> ProcessInts()
{
while (true)
{
yield return intPort.Receive();
int currentValue = intPort;
ReportCurrent(currentValue);
while(intPort.Test(out currentValue))
{
ReportCurrent(currentValue);
}
}
}
private void ReportCurrent(int currentValue)
{
if (currentValue % 1000 == 0)
{
Console.WriteLine("{0}, Current Items In Queue:{1}",
currentValue,
intPort.ItemCount);
}
}
}
或者,您可以完全取消迭代器,因为它在您的示例中并没有得到很好的使用(尽管我不完全确定这对处理顺序有什么影响):
internal class Test : CcrServiceBase
{
private readonly Port<int> intPort = new Port<int>();
private Timer timer;
public Test() : base(new DispatcherQueue("DefaultDispatcherQueue",
new Dispatcher(0,
"dispatcher")))
{
}
public void StartTest()
{
Activate(
Arbiter.Receive(true,
intPort,
i =>
{
ReportCurrent(i);
int currentValue;
while (intPort.Test(out currentValue))
{
ReportCurrent(currentValue);
}
}));
var counter = 0;
timer = new Timer(x =>
{
for (var i = 0; i < 500000; ++i)
{
intPort.Post(counter++);
}
}
,
null,
0,
1000);
}
private void ReportCurrent(int currentValue)
{
if (currentValue % 1000000 == 0)
{
Console.WriteLine("{0}, Current Items In Queue:{1}",
currentValue,
intPort.ItemCount);
}
}
}
这两个示例都将吞吐量显着提高了几个数量级。希望这会有所帮助。
关于c# - 使用 CCR 有序处理事件的有效方法是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/5399365/
Microsoft 的并发和协调运行时确实挽救了一个遇到严重死锁问题的项目。从那时起,我发现我越来越频繁地使用它来处理几乎所有需要异步编码产生比以前更轻、更快的结果的事情。老实说,它改变了我对多线程/
我最近一直在学习 Concurrency and Coordination Runtime (CCR). 的进出 为这种相对较新的技术找到好的学习资源非常困难。 (快速谷歌搜索将“Creedence
所以我被一个 friend 问了一个汇编问题,他也在努力学习和理解这门语言。他问: 给定以下寄存器值: D0: 364B 421E D1: F3FC 9066 指令执行后: cmp.w D0,D
我正在试验 CCR 迭代器作为一项任务的解决方案,该任务需要并行处理大量数据馈送,其中来自每个馈送的数据需要按顺序处理。所有 Feed 都不相互依赖,因此每个 Feed 都可以并行进行按顺序处理。 下
关注这个解决方案Using the CCR with ASynchronous WCF Service 为什么需要这样做: ThreadPool.QueueUserWorkItem(s => call
我确实在手册中找到了一条来自 CCR 指令的 MOVE... http://www.freescale.com/files/archives/doc/ref_manual/M68000PRM.pdf
使用 CCR EXIF http://delphihaven.wordpress.com/ccr-exif/如何获取拍摄图像的时间? (我愿意使用其他东西,只要我可以使用 Delphi,但每个人都说
就目前情况而言,这个问题不太适合我们的问答形式。我们希望答案得到事实、引用资料或专业知识的支持,但这个问题可能会引发辩论、争论、民意调查或扩展讨论。如果您觉得这个问题可以改进并可能重新开放,visit
我想知道是否支持 elastic search cross cluster replication在 AWS 和 Azure 上?我明白了AWS announcement其中说他们将支持跨集群搜索(但
我是一名优秀的程序员,十分优秀!