gpt4 book ai didi

C# Parallel.For 不执行每一步

转载 作者:太空宇宙 更新时间:2023-11-03 23:36:59 24 4
gpt4 key购买 nike

我一直在为目前按顺序运行的导入服务制作模型。然而,我的模型似乎出现了一个奇怪的问题,有时 for 循环中的一两个项目没有被执行。

class Service
{
private Thread _worker;
private bool _stopping;
private CancellationTokenSource _cts;
private ParallelOptions _po;
private Repository _repository;

public void Start(Repository repository)
{
_repository = repository;
_cts = new CancellationTokenSource();
_po = new ParallelOptions {
CancellationToken = _cts.Token
};

_worker = new Thread(ProcessImport);
_worker.Start();
}

public void Stop()
{
_stopping = true;
_cts.Cancel();
if(_worker != null && _worker.IsAlive)
_worker.Join();
}

private void ProcessImport()
{
while (!_stopping)
{
var import = _repository.GetInProgressImport();
if (import == null)
{
Thread.Sleep(1000);
continue;
}

try
{
Parallel.For(0, 1000, _po, i => Work.DoWork(i, import, _cts.Token, _repository));
}
catch (OperationCanceledException)
{
// Unmark batch so it can be started again
batch = _repository.GetBatch(import.BatchId);
batch.Processing = false;
_repository.UpdateBatch(batch);
Console.WriteLine("Aborted import {0}", import.ImportId);
}
catch (Exception ex)
{
Console.WriteLine("Something went wrong: {0}", ex.Message);
}
}
}
}

class Work
{
public static void DoWork(int i, Import import, CancellationToken ct, Repository repository)
{
// Simulate doing some work
Thread.Sleep(100);
HandleAbort(ct);
Thread.Sleep(100);
HandleAbort(ct);
Thread.Sleep(100);

// Update the batch
var batch = repository.GetBatch(import.BatchId);
batch.Processed++;
if (batch.Processed == batch.Total)
{
batch.Finished = DateTime.Now;
batch.Processing = false;
}
repository.UpdateBatch(batch);
}

private static void HandleAbort(CancellationToken ct)
{
if (!ct.IsCancellationRequested)
return;
ct.ThrowIfCancellationRequested();
}
}

使用这段代码,我经常发现批处理永远不会完成,并且 batch.Processed = 999 或 998。

任何人都可以阐明我做错了什么。

提前致谢。

编辑:

要清楚存储库/批处理对象——我相信我当前的模型是线程安全的

class Repository
{
private ConcurrentBag<Batch> _batchData = new ConcurrentBag<Batch>();
private ConcurrentBag<Import> _importData = new ConcurrentBag<Import>();

public void CreateImport(Import import)
{
_importData.Add(import);
}

public Import GetInProgressImport()
{
var import = _importData
.Join(_batchData, i => i.BatchId, b => b.BatchId, (i, b) => new
{
Import = i,
Batch = b
})
.Where(j => j.Batch.Processed < j.Batch.Total && !j.Batch.Processing)
.OrderByDescending(j => j.Batch.Total - j.Batch.Processed)
.ThenBy(j => j.Batch.BatchId - j.Batch.BatchId)
.Select(j => j.Import)
.FirstOrDefault();

if (import == null)
return null;

// mark the batch as processing
var batch = GetBatch(import.BatchId);
batch.Processing = true;
UpdateBatch(batch);

return import;
}

public List<Import> ListImports()
{
return _importData.ToList();
}

public void CreateBatch(Batch batch)
{
_batchData.Add(batch);
}

public Batch GetBatch(Int64 batchId)
{
return _batchData.FirstOrDefault(b => b.BatchId == batchId);
}

public void UpdateBatch(Batch batch)
{
var batchData = _batchData.First(b => b.BatchId == batch.BatchId);
batchData.Total = batch.Total;
batchData.Processed = batch.Processed;
batchData.Started = batch.Started;
batchData.Finished = batch.Finished;
batchData.Processing = batch.Processing;
}
}

class Import
{
public Int64 ImportId { get; set; }
public Int64 BatchId { get; set; }
}

class Batch
{
public Int64 BatchId { get; set; }
public int Total { get; set; }
public int Processed { get; set; }
public DateTime Created { get; set; }
public DateTime Started { get; set; }
public DateTime Finished { get; set; }
public bool Processing { get; set; }
}

这只是一个模型,因此我的存储库背后没有数据库或其他持久性。

此外,我不是根据 i 的值来比较我的批处理,而是根据批处理对象的 Processed 属性指示的循环迭代次数(实际已经完成的工作)。

谢谢

解决方法:

我忘记了需要同步批处理的更新。应该看起来像:

class Work
{
private static object _sync = new object();

public static void DoWork(int i, Import import, CancellationToken ct, Repository repository)
{
// Do work
Thread.Sleep(100);
HandleAbort(ct);
Thread.Sleep(100);
HandleAbort(ct);
Thread.Sleep(100);

lock (_sync)
{
// Update the batch
var batch = repository.GetBatch(import.BatchId);
batch.Processed++;
if (batch.Processed == batch.Total)
{
batch.Finished = DateTime.Now;
batch.Processing = false;
}
repository.UpdateBatch(batch);
}
}

private static void HandleAbort(CancellationToken ct)
{
if (!ct.IsCancellationRequested)
return;
ct.ThrowIfCancellationRequested();
}
}

最佳答案

看起来 batch.Processed 上的更新丢失了。增量不是原子的。 batch.Processed++; 很活泼。使用 Interlocked.Increment

在我看来,您目前对线程的理解还不够深入。在没有很好理解的情况下执行如此复杂的线程是非常危险的。您犯的错误很难测试,但生产会发现它们。

关于C# Parallel.For 不执行每一步,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30303718/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com