- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我想以并行方式处理一个事件。我的想法是将每个回调添加到 ThreadPool,有效地让每个方法注册由 ThreadPool 处理的事件。
我的试用代码如下所示:
Delegate[] delegates = myEvent.GetInvocationList();
IAsyncResult[] results = new IAsyncResult[ delegates.Count<Delegate>() ];
for ( int i = 0; i < delegates.Count<Delegate>(); i++ )
{
IAsyncResult result = ( ( TestDelegate )delegates[ i ] ).BeginInvoke( "BeginInvoke/EndInvoke", null, null );
results[ i ] = result;
}
for ( int i = 0; i < delegates.Length; i++ )
{
( ( TestDelegate )delegates[ i ] ).EndInvoke( results[ i ] );
}
这只是为了玩玩,因为我很好奇如何去做。我相信有更好的方法来做到这一点。
我不喜欢 Func 创建一个包含 lambda 的 WaitCallback。此外,与直接调用委托(delegate)相比,DynamicInvoke 非常慢。我怀疑这种处理事件的方式是否比按顺序处理更快。
我的问题是:如何以并行方式处理事件,最好是使用 ThreadPool?
因为我通常使用 Mono,所以 .NET 4.0 或任务并行库都不是一个选项。
谢谢!
编辑:- 由于 Earwickers 的回答更正了示例。- 更新试用代码
最佳答案
我会选择一种使用 DynamicMethod (LCG) 和状态对象的方法,该对象携带参数并跟踪调用(以便您可以等待它们完成)。
代码:应该做这样的事情(虽然还没有经过全面测试,因此在某些情况下可能会抛出一些讨厌的异常):
/// <summary>
/// Class for dynamic parallel invoking of a MulticastDelegate.
/// (C) 2009 Arsène von Wyss, avw@gmx.ch
/// No warranties of any kind, use at your own risk. Copyright notice must be kept in the source when re-used.
/// </summary>
public static class ParallelInvoke {
private class ParallelInvokeContext<TDelegate> where TDelegate: class {
private static readonly DynamicMethod invoker;
private static readonly Type[] parameterTypes;
static ParallelInvokeContext() {
if (!typeof(Delegate).IsAssignableFrom(typeof(TDelegate))) {
throw new InvalidOperationException("The TDelegate type must be a delegate");
}
Debug.Assert(monitor_enter != null, "Could not find the method Monitor.Enter()");
Debug.Assert(monitor_pulse != null, "Could not find the method Monitor.Pulse()");
Debug.Assert(monitor_exit != null, "Could not find the method Monitor.Exit()");
FieldInfo parallelInvokeContext_activeCalls = typeof(ParallelInvokeContext<TDelegate>).GetField("activeCalls", BindingFlags.Instance|BindingFlags.NonPublic);
Debug.Assert(parallelInvokeContext_activeCalls != null, "Could not find the private field ParallelInvokeContext.activeCalls");
FieldInfo parallelInvokeContext_arguments = typeof(ParallelInvokeContext<TDelegate>).GetField("arguments", BindingFlags.Instance|BindingFlags.NonPublic);
Debug.Assert(parallelInvokeContext_arguments != null, "Could not find the private field ParallelInvokeContext.arguments");
MethodInfo delegate_invoke = typeof(TDelegate).GetMethod("Invoke", BindingFlags.Instance|BindingFlags.Public);
Debug.Assert(delegate_invoke != null, string.Format("Could not find the method {0}.Invoke()", typeof(TDelegate).FullName));
if (delegate_invoke.ReturnType != typeof(void)) {
throw new InvalidOperationException("The TDelegate delegate must not have a return value");
}
ParameterInfo[] parameters = delegate_invoke.GetParameters();
parameterTypes = new Type[parameters.Length];
invoker = new DynamicMethod(string.Format("Invoker<{0}>", typeof(TDelegate).FullName), typeof(void), new[] {typeof(ParallelInvokeContext<TDelegate>), typeof(object)},
typeof(ParallelInvokeContext<TDelegate>), true);
ILGenerator il = invoker.GetILGenerator();
LocalBuilder args = (parameters.Length > 2) ? il.DeclareLocal(typeof(object[])) : null;
bool skipLoad = false;
il.BeginExceptionBlock();
il.Emit(OpCodes.Ldarg_1); // the delegate we are going to invoke
if (args != null) {
Debug.Assert(args.LocalIndex == 0);
il.Emit(OpCodes.Ldarg_0);
il.Emit(OpCodes.Ldfld, parallelInvokeContext_arguments);
il.Emit(OpCodes.Dup);
il.Emit(OpCodes.Stloc_0);
skipLoad = true;
}
foreach (ParameterInfo parameter in parameters) {
if (parameter.ParameterType.IsByRef) {
throw new InvalidOperationException("The TDelegate delegate must note have out or ref parameters");
}
parameterTypes[parameter.Position] = parameter.ParameterType;
if (args == null) {
il.Emit(OpCodes.Ldarg_0);
il.Emit(OpCodes.Ldfld, parallelInvokeContext_arguments);
} else if (skipLoad) {
skipLoad = false;
} else {
il.Emit(OpCodes.Ldloc_0);
}
il.Emit(OpCodes.Ldc_I4, parameter.Position);
il.Emit(OpCodes.Ldelem_Ref);
if (parameter.ParameterType.IsValueType) {
il.Emit(OpCodes.Unbox_Any, parameter.ParameterType);
}
}
il.Emit(OpCodes.Callvirt, delegate_invoke);
il.BeginFinallyBlock();
il.Emit(OpCodes.Ldarg_0);
il.Emit(OpCodes.Call, monitor_enter);
il.Emit(OpCodes.Ldarg_0);
il.Emit(OpCodes.Dup);
il.Emit(OpCodes.Ldfld, parallelInvokeContext_activeCalls);
il.Emit(OpCodes.Ldc_I4_1);
il.Emit(OpCodes.Sub);
il.Emit(OpCodes.Dup);
Label noPulse = il.DefineLabel();
il.Emit(OpCodes.Brtrue, noPulse);
il.Emit(OpCodes.Stfld, parallelInvokeContext_activeCalls);
il.Emit(OpCodes.Ldarg_0);
il.Emit(OpCodes.Call, monitor_pulse);
Label exit = il.DefineLabel();
il.Emit(OpCodes.Br, exit);
il.MarkLabel(noPulse);
il.Emit(OpCodes.Stfld, parallelInvokeContext_activeCalls);
il.MarkLabel(exit);
il.Emit(OpCodes.Ldarg_0);
il.Emit(OpCodes.Call, monitor_exit);
il.EndExceptionBlock();
il.Emit(OpCodes.Ret);
}
[Conditional("DEBUG")]
private static void VerifyArgumentsDebug(object[] args) {
for (int i = 0; i < parameterTypes.Length; i++) {
if (args[i] == null) {
if (parameterTypes[i].IsValueType) {
throw new ArgumentException(string.Format("The parameter {0} cannot be null, because it is a value type", i));
}
} else if (!parameterTypes[i].IsAssignableFrom(args[i].GetType())) {
throw new ArgumentException(string.Format("The parameter {0} is not compatible", i));
}
}
}
private readonly object[] arguments;
private readonly WaitCallback invokeCallback;
private int activeCalls;
public ParallelInvokeContext(object[] args) {
if (parameterTypes.Length > 0) {
if (args == null) {
throw new ArgumentNullException("args");
}
if (args.Length != parameterTypes.Length) {
throw new ArgumentException("The parameter count does not match");
}
VerifyArgumentsDebug(args);
arguments = args;
} else if ((args != null) && (args.Length > 0)) {
throw new ArgumentException("This delegate does not expect any parameters");
}
invokeCallback = (WaitCallback)invoker.CreateDelegate(typeof(WaitCallback), this);
}
public void QueueInvoke(Delegate @delegate) {
Debug.Assert(@delegate is TDelegate);
activeCalls++;
ThreadPool.QueueUserWorkItem(invokeCallback, @delegate);
}
}
private static readonly MethodInfo monitor_enter;
private static readonly MethodInfo monitor_exit;
private static readonly MethodInfo monitor_pulse;
static ParallelInvoke() {
monitor_enter = typeof(Monitor).GetMethod("Enter", BindingFlags.Static|BindingFlags.Public, null, new[] {typeof(object)}, null);
monitor_pulse = typeof(Monitor).GetMethod("Pulse", BindingFlags.Static|BindingFlags.Public, null, new[] {typeof(object)}, null);
monitor_exit = typeof(Monitor).GetMethod("Exit", BindingFlags.Static|BindingFlags.Public, null, new[] {typeof(object)}, null);
}
public static void Invoke<TDelegate>(TDelegate @delegate) where TDelegate: class {
Invoke(@delegate, null);
}
public static void Invoke<TDelegate>(TDelegate @delegate, params object[] args) where TDelegate: class {
if (@delegate == null) {
throw new ArgumentNullException("delegate");
}
ParallelInvokeContext<TDelegate> context = new ParallelInvokeContext<TDelegate>(args);
lock (context) {
foreach (Delegate invocationDelegate in ((Delegate)(object)@delegate).GetInvocationList()) {
context.QueueInvoke(invocationDelegate);
}
Monitor.Wait(context);
}
}
}
用法:
ParallelInvoke.Invoke(yourDelegate, arguments);
注意事项:
事件处理程序中的异常未得到处理(但 IL 代码有一个 finally 来递减计数器,因此该方法应该正确结束),这可能会导致麻烦。也可以在 IL 代码中捕获和传输异常。
不执行继承以外的隐式转换(例如 int 到 double)并会抛出异常。
所使用的同步技术不分配操作系统等待句柄,这通常有利于提高性能。可以在 Joseph Albahari's page 上找到关于监视器工作原理的描述。 .
经过一些性能测试后,这种方法似乎比任何使用“ native ”BeginInvoke/EndInvoke 调用委托(delegate)的方法(至少在 MS CLR 中)都好得多。
关于C# 事件 : How to process event in a parallel manner,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/1516119/
在 Oracle 中,PARALLEL 被广泛使用。提示 PARALLEL、PARALLEL(8) 和 PARALLEL(a,8) 有什么区别。如何选择最佳的查询提示? SELECT /*+ PARA
好的,我希望以前没有问过这个问题,因为在搜索中很难找到。 我查看了 F95 手册,但仍然觉得这很模糊: For the simple case of: DO i=0,99 END DO 我正
我有一个 C-shell 脚本,其中有一个名为 $hosts_string 的变量,格式为: host1,host2,...,hostN 我还有一个名为 $chrs_string 的变量,其形式为:
是否可以从由gnu parallel产生的脚本的多次运行中调用gnu parallel? 我有一个python脚本,可以运行100个顺序顺序迭代,并且在每次迭代中的某处,并行计算4个值(使用gnu p
我想在几个输入上运行几个长时间运行的进程。例如。: solver_a problem_1 solver_b problem_1 ... solver_b problem_18 solver_c pro
TParallel.&For 和 TParallel.For 之间有区别吗? 两者都可以在 Delphi 10 Seattle 中编译。那么我应该坚持哪一个呢? 最佳答案 TParallel.&For
我第一次使用 julia 进行并行计算.我有点头疼。所以假设我开始 julia如下:julia -p 4 .然后我为所有处理器声明 a 函数,然后将它与 pmap 一起使用还有@parallel fo
关闭。这个问题是off-topic .它目前不接受答案。 想改善这个问题吗? Update the question所以它是 on-topic对于堆栈溢出。 10年前关闭。 Improve this
我有一堆相互排斥的方法,因此可以并行运行。有这样做的好方法吗?到目前为止,我有以下两种实现方式,但我不确定是否应该选择其中一种。 使用 Parallel.For : Parallel.For(0, 2
我对并行运行脚本很感兴趣,并且我已经开始查看 GNU 并行工具,但是我遇到了一些麻烦。我的脚本 doSomething 有 3 个参数,我想在参数的不同值上并行运行脚本。我该怎么做? 我试过:para
我需要在多核(和多线程)机器上运行多个作业。我正在使用 GNU Parallel utility跨核心分配作业以加速任务。要执行的命令在名为“命令”的文件中可用。我使用以下命令运行 GNU Paral
我正在尝试使用如下两个输入运行 Python 脚本。我得到了大约 300 个这两个输入,所以我想知道是否有人可以建议如何并行运行它们。 单次运行看起来像: python stable.py KOG_1
每天我都必须更新一堆存储库,并在其中一些中执行另一个命令(来自 CARTON,Perl 模块依赖管理器)。我总是使用循环来执行此操作,但我想与 并行执行GNU 并行 如果可能,但我不太了解它的tuto
正如标题所说:@parallel 之间究竟有什么区别?和 pmap ?我的意思不是明显的一个是循环的宏,另一个适用于函数,我的意思是它们的实现究竟有什么不同,我应该如何使用这些知识在它们之间进行选择?
我有一些矩阵乘法运算。我想通过多个处理器并行执行这些操作。这可以使用 MPI(消息传递接口(interface))在高性能计算集群上完成。 同样,我可以使用多个辅助角色在云中进行一些并行化吗?有什么办
joblib模块提供了一个简单的帮助程序类,以使用多处理并行编写循环的循环。 这段代码使用列表推导来完成这项工作: import time from math import sqrt from job
我的问题是这样的one .但我想做一些不同的事情... 例如,在我的并行区域内,我想在 4 个线程上运行我的代码。当每个线程进入 for 循环时,我想在 8 个线程上运行我的代码。像 #pramga
我正在尝试使用 ipython 并行库中的并行计算。但是我对此知之甚少,而且我发现很难从对并行计算一无所知的人那里阅读该文档。 有趣的是,我发现的所有教程都只是重复使用文档中的示例,并使用相同的解释,
我的项目结构看起来像 Root + subproj1 + subproj2 在每个子项目中定义了自己的任务 run(){}。 我想要做的是从 Root 项目的运行任务并行运行 :subpro
我有一个 Foo ID 的列表。我需要为每个 ID 调用一个存储过程。 例如 Guid[] siteIds = ...; // typically contains 100 to 300 elemen
我是一名优秀的程序员,十分优秀!