gpt4 book ai didi

c# - 线程安全的数据缓冲区,以进行大小受控的批量插入

转载 作者:太空狗 更新时间:2023-10-30 00:56:45 25 4
gpt4 key购买 nike

我有一个生成必须保存到数据库的数据的模拟。

ParallelLoopResult res = Parallel.For(0, 1000000, options, (r, state) =>
{
ComplexDataSet cds = GenerateData(r);

SaveDataToDatabase(cds);

});

模拟会生成大量数据,因此先生成它然后将其保存到数据库(最多 1 GB 的数据)是不切实际的,将其保存到数据库也没有意义一笔一笔(交易量太小而不实用)。我想将它们作为受控大小的批量插入插入到数据库中(比如一次提交 100 个)。

但是,我认为我对并行计算的了解还不够理论化。我想到了这个(如您所见,这是非常有缺陷的):

DataBuffer buffer = new DataBuffer(...);

ParallelLoopResult res = Parallel.For(0, 10000000, options, (r, state) =>
{
ComplexDataSet cds = GenerateData(r);

buffer.SaveDataToBuffer(cds, i == r - 1);

});

public class DataBuffer
{
int count = 0;
int limit = 100

object _locker = new object();

ConcurrentQueue<ConcurrentBag<ComplexDataSet>> ComplexDataBagQueue{ get; set; }

public void SaveDataToBuffer(ComplexDataSet data, bool isfinalcycle)
{
lock (_locker)
{
if(count >= limit)
{
ConcurrentBag<ComplexDataSet> dequeueRef;
if(ComplexDataBagQueue.TryDequeue(out dequeueRef))
{
Commit(dequeueRef);
}

_lastItemRef = new ConcurrentBag<ComplexDataSet>{data};
ComplexDataSetsQueue.Enqueue(_lastItemRef);
count = 1;
}
else
{
// First time
if(_lastItemRef == null)
{
_lastItemRef = new ConcurrentBag<ComplexDataSet>{data};
ComplexDataSetsQueue.Enqueue(_lastItemRef);
count = 1;
}
// If buffer isn't full
else
{
_lastItemRef.Add(data);
count++;
}
}

if(isfinalcycle)
{
// Commit everything that hasn't been committed yet
ConcurrentBag<ComplexDataSet> dequeueRef;
while (ComplexDataSetsQueue.TryDequeue(out dequeueRef))
{
Commit(dequeueRef);
}
}
}
}

public void Commit(ConcurrentBag<ComplexDataSet> data)
{
// Commit data to database..should this be somehow in another thread or something ?
}
}

如您所见,我正在使用队列创建缓冲区,然后手动决定何时提交。但是,我有一种强烈的感觉,这不是解决我的问题的有效方法。首先,我不确定我是否正在正确锁定。其次,我不确定这是否是完全线程安全的(或根本不安全)。

能否请您看一下并评论我应该做些什么不同的事情?或者是否有更好的方法(使用某种生产者-消费者技术或其他方法)?

谢谢和最良好的祝愿,D.

最佳答案

无需使用锁或昂贵的并发安全数据结构。数据都是独立的,因此引入锁定和共享只会损害性能和可扩展性。

Parallel.For 有一个重载,可让您指定每个线程的数据。在此您可以存储私有(private)队列和私有(private)数据库连接。

此外:Parallel.For 在内部将您的范围划分为更小的 block 。将它传递到一个很大的范围内是非常有效的,所以那里没有什么可以改变的。

Parallel.For(0, 10000000, () => new ThreadState(),
(i, loopstate, threadstate) =>
{
ComplexDataSet data = GenerateData(i);

threadstate.Add(data);

return threadstate;
}, threadstate => threadstate.Dispose());

sealed class ThreadState : IDisposable
{
readonly IDisposable db;
readonly Queue<ComplexDataSet> queue = new Queue<ComplexDataSet>();

public ThreadState()
{
// initialize db with a private MongoDb connection.
}

public void Add(ComplexDataSet cds)
{
queue.Enqueue(cds);

if(queue.Count == 100)
{
Commit();
}
}

void Commit()
{
db.Write(queue);
queue.Clear();
}

public void Dispose()
{
try
{
if(queue.Count > 0)
{
Commit();
}
}
finally
{
db.Dispose();
}
}
}

现在,MongoDb 目前不支持真正的并发插入——它在服务器中持有一些昂贵的锁,因此并行提交不会为您带来太多(如果有的话)速度。他们希望在未来解决这个问题,所以有一天您可能会获得免费加速。

如果您需要限制保持的数据库连接数,生产者/消费者设置是一个不错的选择。您可以使用 BlockingCollection 队列高效地执行此操作,而无需使用任何锁:

// Specify a maximum of 1000 items in the collection so that we don't
// run out of memory if we get data faster than we can commit it.
// Add() will wait if it is full.

BlockingCollection<ComplexDataSet> commits =
new BlockingCollection<ComplexDataSet>(1000);

Task consumer = Task.Factory.StartNew(() =>
{
// This is the consumer. It processes the
// "commits" queue until it signals completion.

while(!commits.IsCompleted)
{
ComplexDataSet cds;

// Timeout of -1 will wait for an item or IsCompleted == true.

if(commits.TryTake(out cds, -1))
{
// Got at least one item, write it.
db.Write(cds);

// Continue dequeuing until the queue is empty, where it will
// timeout instantly and return false, or until we've dequeued
// 100 items.

for(int i = 1; i < 100 && commits.TryTake(out cds, 0); ++i)
{
db.Write(cds);
}

// Now that we're waiting for more items or have dequeued 100
// of them, commit. More can be continue to be added to the
// queue by other threads while this commit is processing.

db.Commit();
}
}
}, TaskCreationOptions.LongRunning);

try
{
// This is the producer.

Parallel.For(0, 1000000, i =>
{
ComplexDataSet data = GenerateData(i);
commits.Add(data);
});
}
finally // put in a finally to ensure the task closes down.
{
commits.CompleteAdding(); // set commits.IsFinished = true.
consumer.Wait(); // wait for task to finish committing all the items.
}

关于c# - 线程安全的数据缓冲区,以进行大小受控的批量插入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/6928559/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com