gpt4 book ai didi

c# - 异步生产者/消费者

转载 作者:行者123 更新时间:2023-11-30 17:52:01 30 4
gpt4 key购买 nike

我有一个从多个线程访问的类的实例。此类接受此调用并将元组添加到数据库中。我需要以串行方式完成此操作,因为由于某些数据库约束,并行线程可能会导致数据库不一致。

由于我不熟悉 C# 中的并行性和并发性,所以我这样做了:

private BlockingCollection<Task> _tasks = new BlockingCollection<Task>();

public void AddDData(string info)
{
Task t = new Task(() => { InsertDataIntoBase(info); });
_tasks.Add(t);
}

private void InsertWorker()
{
Task.Factory.StartNew(() =>
{
while (!_tasks.IsCompleted)
{
Task t;
if (_tasks.TryTake(out t))
{
t.Start();
t.Wait();
}
}
});
}

AddDData 是被多线程调用的,InsertDataIntoBase 是一个非常简单的插入,只需几毫秒。

问题是,出于某种原因,我缺乏知识,无法弄清楚,有时一个任务会被调用两次!它总是这样:

T1T2T3T1 <- PK 错误。T4...

我对 .Take() 的理解是否完全错误,是我遗漏了什么,还是我的生产者/消费者实现真的很糟糕?

最好的问候,拉斐尔

更新:

按照建议,我使用此架构进行了快速沙盒测试实现,正如我所怀疑的那样,它不能保证在前一个任务完成之前不会触发任务。

enter image description here

所以问题仍然存在:如何正确地排队任务并按顺序触发它们?

更新 2:

我简化了代码:

private BlockingCollection<Data> _tasks = new BlockingCollection<Data>();

public void AddDData(Data info)
{
_tasks.Add(info);
}

private void InsertWorker()
{
Task.Factory.StartNew(() =>
{
while (!_tasks.IsCompleted)
{
Data info;
if (_tasks.TryTake(out info))
{
InsertIntoDB(info);
}
}
});
}

请注意,我摆脱了 Tasks,因为我依赖于同步的 InsertIntoDB 调用(因为它在一个循环中),但仍然没有运气......这一代很好,我绝对确定只有独特的实例是去排队。但无论我怎么尝试,有时同一个对象会被使用两次。

最佳答案

我认为这应该可行:

    private static BlockingCollection<string> _itemsToProcess = new BlockingCollection<string>();

static void Main(string[] args)
{
InsertWorker();
GenerateItems(10, 1000);
_itemsToProcess.CompleteAdding();
}

private static void InsertWorker()
{
Task.Factory.StartNew(() =>
{
while (!_itemsToProcess.IsCompleted)
{
string t;
if (_itemsToProcess.TryTake(out t))
{
// Do whatever needs doing here
// Order should be guaranteed since BlockingCollection
// uses a ConcurrentQueue as a backing store by default.
// http://msdn.microsoft.com/en-us/library/dd287184.aspx#remarksToggle
Console.WriteLine(t);
}
}
});
}

private static void GenerateItems(int count, int maxDelayInMs)
{
Random r = new Random();
string[] items = new string[count];

for (int i = 0; i < count; i++)
{
items[i] = i.ToString();
}

// Simulate many threads adding items to the collection
items
.AsParallel()
.WithDegreeOfParallelism(4)
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.Select((x) =>
{
Thread.Sleep(r.Next(maxDelayInMs));
_itemsToProcess.Add(x);
return x;
}).ToList();
}

这确实意味着消费者是单线程的,但允许多个生产者线程。

关于c# - 异步生产者/消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18749185/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com