gpt4 book ai didi

c# - 为什么 WithMergeOptions(ParallelMergeOptions.NotBuffered) 不立即提供结果?

转载 作者:太空宇宙 更新时间:2023-11-03 11:09:34 24 4
gpt4 key购买 nike

(我目前仅限于.NET 4.0)

我有一个情况,我想尽可能并行处理项目,必须保持顺序,并且可以随时添加项目,直到按下“停止”。

元素可以“突发”进来,所以队列可能会完全耗尽,会有一个停顿,然后大量元素会再次进来。

我希望结果一完成就可用。

这是一个简化的例子:

class Program
{
static void Main(string[] args)
{
BlockingCollection<int> itemsQueue = new BlockingCollection<int>();

Random random = new Random();

var results = itemsQueue
.GetConsumingEnumerable()
.AsParallel()
.AsOrdered()
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
.Select(i =>
{
int work = 0;

Console.WriteLine("Working on " + i);

//simulate work
for (int busy = 0; busy <= 90000000; ++busy) { ++work; };

Console.WriteLine("Finished " + i);


return i;
});

TaskCompletionSource<bool> completion = new TaskCompletionSource<bool>();

Task.Factory.StartNew(() =>
{
foreach (int i in results)
{
Console.WriteLine("Result Available: " + i);
}
completion.SetResult(true);
});

int iterations;
iterations = random.Next(5, 50);
Console.WriteLine("------- iterations: " + iterations + "-------");

for (int i = 1; i <= iterations; ++i)
{
itemsQueue.Add(i);
}

while (true)
{
char c = Console.ReadKey().KeyChar;

if (c == 's')
{
break;
}
else
{
++iterations;

Console.WriteLine("adding: " + iterations);
itemsQueue.Add(iterations);
}
}


itemsQueue.CompleteAdding();

completion.Task.Wait();

Console.WriteLine("Done!");
Console.ReadKey();
itemsQueue.Dispose();
}
}

如上例所示,通常会发生的情况是,直到最后几个结果(我不是 100% 确定这一点,但它停止的结果数量可能大致相关)与盒子上的核心数),直到 itemsQueue.CompleteAdding(); 被调用(在这个例子中,“s”键被按下),此时剩下的结果将最终变得可用。

尽管我指定了 .WithMergeOptions(ParallelMergeOptions.NotBuffered),但为什么结果没有立即可用,我怎样才能使它们立即可用?

最佳答案

请注意,如果您可以调用 BlockingQueue.CompleteAdding() 实例方法,那么问题就不是问题了 - 这将导致所有结果完成。

简答题

如果另一方面,您需要维持秩序,并且需要尽快获得结果,并且您没有机会调用 BlockingQueue.CompleteAdding(),那么如果可能的话,最好让队列中的项目的消费是非并行的,但并行处理每个单独的任务。

例如

  class Program
{
//Not parallel, but suitable for monitoring queue purposes,
//can then focus on parallelizing each individual task
static void Main(string[] args)
{
BlockingCollection<int> itemsQueue = new BlockingCollection<int>();


Random random = new Random();

var results = itemsQueue.GetConsumingEnumerable()
.Select(i =>
{
Console.WriteLine("Working on " + i);

//Focus your parallelization efforts on the work of
//the individual task
//E.g, simulated:
double work = Enumerable.Range(0, 90000000 - (10 * (i % 3)))
.AsParallel()
.Select(w => w + 1)
.Average();

Console.WriteLine("Finished " + i);


return i;
});

TaskCompletionSource<bool> completion = new TaskCompletionSource<bool>();

Task.Factory.StartNew(() =>
{
foreach (int i in results)
{
Console.WriteLine("Result Available: " + i);
}
completion.SetResult(true);
});

int iterations;
iterations = random.Next(5, 50);
Console.WriteLine("------- iterations: " + iterations + "-------");

for (int i = 1; i <= iterations; ++i)
{
itemsQueue.Add(i);
}

while (true)
{
char c = Console.ReadKey().KeyChar;

if (c == 's')
{
break;
}
else
{
++iterations;

Console.WriteLine("adding: " + iterations);
itemsQueue.Add(iterations);
}
}


itemsQueue.CompleteAdding();

completion.Task.Wait();

Console.WriteLine("Done!");
Console.ReadKey();
itemsQueue.Dispose();
}
}

更长的答案

似乎 BlockingQueueAsOrderable() 之间存在交互作用

似乎 AsOrderable 将在分区 block 中的枚举器之一停止处理任务。

默认分区程序将处理通常大于 1 的 block - 并且阻塞队列将阻塞直到 block 可以被填充(或 CompleteAdding 被填充)。

但是,即使 block 大小为 1,问题也不会完全消失。

为了解决这个问题,您有时可以在实现自己的分区程序时看到这种行为。 (请注意,如果您指定 .WithDegreeOfParallelism(1),结果等待出现的问题就会消失 - 但当然,并行度 = 1 会破坏目的!)

例如

public class ImmediateOrderedPartitioner<T> : OrderablePartitioner<T>
{
private readonly IEnumerable<T> _consumingEnumerable;
private readonly Ordering _ordering = new Ordering();

public ImmediateOrderedPartitioner(BlockingCollection<T> collection) : base(true, true, true)
{
_consumingEnumerable = collection.GetConsumingEnumerable();
}

private class Ordering
{
public int Order = -1;
}

private class MyEnumerator<S> : IEnumerator<KeyValuePair<long, S>>
{
private readonly object _orderLock = new object();

private readonly IEnumerable<S> _enumerable;

private KeyValuePair<long, S> _current;

private bool _hasItem;

private Ordering _ordering;

public MyEnumerator(IEnumerable<S> consumingEnumerable, Ordering ordering)
{
_enumerable = consumingEnumerable;
_ordering = ordering;
}

public KeyValuePair<long, S> Current
{
get
{
if (_hasItem)
{
return _current;
}
else
throw new InvalidOperationException();
}
}

public void Dispose()
{

}

object System.Collections.IEnumerator.Current
{
get
{
return Current;
}
}

public bool MoveNext()
{
lock (_orderLock)
{
bool canMoveNext = false;

var next = _enumerable.Take(1).FirstOrDefault(s => { canMoveNext = true; return true; });

if (canMoveNext)
{
_current = new KeyValuePair<long, S>(++_ordering.Order, next);
_hasItem = true;
++_ordering.Order;
}
else
{
_hasItem = false;
}

return canMoveNext;
}
}

public void Reset()
{
throw new NotSupportedException();
}
}

public override IList<IEnumerator<KeyValuePair<long, T>>> GetOrderablePartitions(int partitionCount)
{
var result = new List<IEnumerator<KeyValuePair<long,T>>>();

//for (int i = 0; i < partitionCount; ++i)
//{
// result.Add(new MyEnumerator<T>(_consumingEnumerable, _ordering));
//}

//share the enumerator between partitions in this case to maintain
//the proper locking on ordering.
var enumerator = new MyEnumerator<T>(_consumingEnumerable, _ordering);

for (int i = 0; i < partitionCount; ++i)
{
result.Add(enumerator);
}

return result;
}

public override bool SupportsDynamicPartitions
{
get
{
return false;
}
}

public override IEnumerable<T> GetDynamicPartitions()
{
throw new NotImplementedException();
return base.GetDynamicPartitions();
}

public override IEnumerable<KeyValuePair<long, T>> GetOrderableDynamicPartitions()
{
throw new NotImplementedException();
return base.GetOrderableDynamicPartitions();
}

public override IList<IEnumerator<T>> GetPartitions(int partitionCount)
{
throw new NotImplementedException();
return base.GetPartitions(partitionCount);
}
}

class Program
{
static void Main(string[] args)
{
BlockingCollection<int> itemsQueue = new BlockingCollection<int>();

var partitioner = new ImmediateOrderedPartitioner<int>(itemsQueue);

Random random = new Random();

var results = partitioner
.AsParallel()
.AsOrdered()
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
//.WithDegreeOfParallelism(1)
.Select(i =>
{
int work = 0;

Console.WriteLine("Working on " + i);

for (int busy = 0; busy <= 90000000; ++busy) { ++work; };

Console.WriteLine("Finished " + i);


return i;
});

TaskCompletionSource<bool> completion = new TaskCompletionSource<bool>();

Task.Factory.StartNew(() =>
{
foreach (int i in results)
{
Console.WriteLine("Result Available: " + i);
}
completion.SetResult(true);
});

int iterations;
iterations = 1; // random.Next(5, 50);
Console.WriteLine("------- iterations: " + iterations + "-------");

for (int i = 1; i <= iterations; ++i)
{
itemsQueue.Add(i);
}

while (true)
{
char c = Console.ReadKey().KeyChar;

if (c == 's')
{
break;
}
else
{
++iterations;

Console.WriteLine("adding: " + iterations);
itemsQueue.Add(iterations);
}
}


itemsQueue.CompleteAdding();

completion.Task.Wait();

Console.WriteLine("Done!");
Console.ReadKey();
itemsQueue.Dispose();
}
}

替代方法如果并行化单个任务(如“简短回答”中所建议的那样)是不可能的,并且所有其他问题约束都适用,那么您可以实现自己的队列类型,为每个项目启动任务 - 从而让任务并行库处理工作的调度,但自行同步结果的消费。

例如,如下所示(带有标准的“无保证”免责声明!)

public class QueuedItem<TInput, TResult>
{
private readonly object _lockObject = new object();

private TResult _result;

private readonly TInput _input;

private readonly TResult _notfinished;

internal readonly bool IsEndQueue = false;

internal QueuedItem()
{
IsEndQueue = true;
}

public QueuedItem(TInput input, TResult notfinished)
{
_input = input;
_notfinished = notfinished;
_result = _notfinished;
}

public TResult ReadResult()
{
lock (_lockObject)
{
if (!IsResultReady)
throw new InvalidOperationException("Check IsResultReady before calling ReadResult()");

return _result;
}
}

public void WriteResult(TResult value)
{
lock (_lockObject)
{
if (IsResultReady)
throw new InvalidOperationException("Result has already been written");

_result = value;
}
}

public TInput Input { get { return _input; } }

public bool IsResultReady
{
get
{
lock (_lockObject)
{
return !object.Equals(_result, _notfinished) || IsEndQueue;
}
}
}
}


public class ParallelImmediateOrderedProcessingQueue<TInput, TResult>
{
private readonly ReaderWriterLockSlim _addLock = new ReaderWriterLockSlim();

private readonly object _readingResultsLock = new object();

private readonly ConcurrentQueue<QueuedItem<TInput, TResult>> _concurrentQueue = new ConcurrentQueue<QueuedItem<TInput, TResult>>();

bool _isFinishedAdding = false;

private readonly TResult _notFinished;

private readonly Action<QueuedItem<TInput, TResult>> _processor;

/// <param name="notFinished">A value that indicates the result is not yet finished</param>
/// <param name="processor">Must call SetResult() on argument when finished.</param>
public ParallelImmediateOrderedProcessingQueue(TResult notFinished, Action<QueuedItem<TInput, TResult>> processor)
{
_notFinished = notFinished;
_processor = processor;
}

public event Action ResultsReady = delegate { };

private void SignalResult()
{
QueuedItem<TInput, TResult> item;
if (_concurrentQueue.TryPeek(out item) && item.IsResultReady)
{
ResultsReady();
}
}

public void Add(TInput input)
{
bool shouldThrow = false;

_addLock.EnterReadLock();
{
shouldThrow = _isFinishedAdding;

if (!shouldThrow)
{
var queuedItem = new QueuedItem<TInput, TResult>(input, _notFinished);

_concurrentQueue.Enqueue(queuedItem);

Task.Factory.StartNew(() => { _processor(queuedItem); SignalResult(); });
}
}
_addLock.ExitReadLock();

if (shouldThrow)
throw new InvalidOperationException("An attempt was made to add an item, but adding items was marked as completed");
}

public IEnumerable<TResult> ConsumeReadyResults()
{
//lock necessary to preserve ordering
lock (_readingResultsLock)
{
QueuedItem<TInput, TResult> queuedItem;

while (_concurrentQueue.TryPeek(out queuedItem) && queuedItem.IsResultReady)
{
if (!_concurrentQueue.TryDequeue(out queuedItem))
throw new ApplicationException("this shouldn't happen");

if (queuedItem.IsEndQueue)
{
_completion.SetResult(true);
}
else
{
yield return queuedItem.ReadResult();
}
}
}
}

public void CompleteAddingItems()
{
_addLock.EnterWriteLock();
{
_isFinishedAdding = true;

var queueCompletion = new QueuedItem<TInput, TResult>();

_concurrentQueue.Enqueue(queueCompletion);
Task.Factory.StartNew(() => { SignalResult(); });
}
_addLock.ExitWriteLock();
}

TaskCompletionSource<bool> _completion = new TaskCompletionSource<bool>();

public void WaitForCompletion()
{
_completion.Task.Wait();
}
}

class Program
{
static void Main(string[] args)
{
const int notFinished = int.MinValue;

var processingQueue = new ParallelImmediateOrderedProcessingQueue<int, int>(notFinished, qi =>
{
int work = 0;

Console.WriteLine("Working on " + qi.Input);

//simulate work
int maxBusy = 90000000 - (10 * (qi.Input % 3));
for (int busy = 0; busy <= maxBusy; ++busy) { ++work; };

Console.WriteLine("Finished " + qi.Input);

qi.WriteResult(qi.Input);
});

processingQueue.ResultsReady += new Action(() =>
{
Task.Factory.StartNew(() =>
{
foreach (int result in processingQueue.ConsumeReadyResults())
{
Console.WriteLine("Results Available: " + result);
}
});
});


int iterations = new Random().Next(5, 50);
Console.WriteLine("------- iterations: " + iterations + "-------");

for (int i = 1; i <= iterations; ++i)
{
processingQueue.Add(i);
}

while (true)
{
char c = Console.ReadKey().KeyChar;

if (c == 's')
{
break;
}
else
{
++iterations;

Console.WriteLine("adding: " + iterations);
processingQueue.Add(iterations);
}
}

processingQueue.CompleteAddingItems();
processingQueue.WaitForCompletion();

Console.WriteLine("Done!");
Console.ReadKey();
}
}

关于c# - 为什么 WithMergeOptions(ParallelMergeOptions.NotBuffered) 不立即提供结果?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14575009/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com