gpt4 book ai didi

.net - Parallel.ForEach - 优雅的取消

转载 作者:行者123 更新时间:2023-12-03 22:36:21 27 4
gpt4 key购买 nike

关于等待任务完成和线程同步的主题。

我目前有一个包含在 Parallel.ForEach 中的迭代。在下面的示例中,我在评论中提出了一些关于如何最好地处理循环的优雅终止(.NET 4.0)的问题;

private void myFunction()
{

IList<string> iListOfItems = new List<string>();
// populate iListOfItems

CancellationTokenSource cts = new CancellationTokenSource();

ParallelOptions po = new ParallelOptions();
po.MaxDegreeOfParallelism = 20; // max threads
po.CancellationToken = cts.Token;

try
{
var myWcfProxy = new myWcfClientSoapClient();

if (Parallel.ForEach(iListOfItems, po, (item, loopsate) =>
{
try
{
if (_requestedToStop)
loopsate.Stop();
// long running blocking WS call, check before and after
var response = myWcfProxy.ProcessIntervalConfiguration(item);
if (_requestedToStop)
loopsate.Stop();

// perform some local processing of the response object
}
catch (Exception ex)
{
// cannot continue game over.
if (myWcfProxy.State == CommunicationState.Faulted)
{
loopsate.Stop();
throw;
}
}

// else carry on..
// raise some events and other actions that could all risk an unhanded error.

}
).IsCompleted)
{
RaiseAllItemsCompleteEvent();
}
}
catch (Exception ex)
{
// if an unhandled error is raised within one of the Parallel.ForEach threads, do all threads in the
// ForEach abort? or run to completion? Is loopsate.Stop (or equivalent) called as soon as the framework raises an Exception?
// Do I need to call cts.Cancel here?

// I want to wait for all the threads to terminate before I continue at this point. Howe do we achieve that?

// do i need to call cts.Dispose() ?

MessageBox.Show(Logging.FormatException(ex));
}
finally
{

if (myWcfProxy != null)
{
// possible race condition with the for-each threads here unless we wait for them to terminate.
if (myWcfProxy.State == System.ServiceModel.CommunicationState.Faulted)
myWcfProxy.Abort();

myWcfProxy.Close();
}

// possible race condition with the for-each threads here unless we wait for them to terminate.
_requestedToStop = false;

}

}

非常感激任何的帮助。 MSDN 文档谈到了 ManualResetEventSlim 和 cancelToken.WaitHandle。但不确定如何将它们连接到这里,似乎很难理解 MSDN 示例,因为大多数不适用。

最佳答案

我在下面模拟了一些可以回答您问题的代码。基本点是您使用 Parallel.ForEach 获得 fork/join 并行性,因此您无需担心并行任务之外的竞争条件(调用线程阻塞,直到任务完成,成功或其他方式)。您只想确保使用 LoopState 变量(lambda 的第二个参数)来控制循环状态。

如果循环的任何迭代抛出未处理的异常,则整个循环将引发最后捕获的 AggregateException。

提及此主题的其他链接:

Parallel.ForEach throws exception when processing extremely large sets of data

http://msdn.microsoft.com/en-us/library/dd460720.aspx

Does Parallel.ForEach limits the number of active threads?

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.ServiceModel;

namespace Temp
{
public class Class1
{
private class MockWcfProxy
{
internal object ProcessIntervalConfiguration(string item)
{
return new Object();
}

public CommunicationState State { get; set; }
}

private void myFunction()
{

IList<string> iListOfItems = new List<string>();
// populate iListOfItems

CancellationTokenSource cts = new CancellationTokenSource();

ParallelOptions po = new ParallelOptions();
po.MaxDegreeOfParallelism = 20; // max threads
po.CancellationToken = cts.Token;

try
{
var myWcfProxy = new MockWcfProxy();

if (Parallel.ForEach(iListOfItems, po, (item, loopState) =>
{
try
{
if (loopState.ShouldExitCurrentIteration || loopState.IsExceptional)
loopState.Stop();

// long running blocking WS call, check before and after
var response = myWcfProxy.ProcessIntervalConfiguration(item);

if (loopState.ShouldExitCurrentIteration || loopState.IsExceptional)
loopState.Stop();

// perform some local processing of the response object
}
catch (Exception ex)
{
// cannot continue game over.
if (myWcfProxy.State == CommunicationState.Faulted)
{
loopState.Stop();
throw;
}

// FYI you are swallowing all other exceptions here...
}

// else carry on..
// raise some events and other actions that could all risk an unhanded error.
}
).IsCompleted)
{
RaiseAllItemsCompleteEvent();
}
}
catch (AggregateException aggEx)
{
// This section will be entered if any of the loops threw an unhandled exception.
// Because we re-threw the WCF exeption above, you can use aggEx.InnerExceptions here
// to see those (if you want).
}
// Execution will not get to this point until all of the iterations have completed (or one
// has failed, and all that were running when that failure occurred complete).
}

private void RaiseAllItemsCompleteEvent()
{
// Everything completed...
}
}
}

关于.net - Parallel.ForEach - 优雅的取消,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/4671865/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com