gpt4 book ai didi

c# - 用于处理大数据量(100 万条记录等)的数据结构和技术

转载 作者:行者123 更新时间:2023-11-30 20:46:20 56 4
gpt4 key购买 nike

我一直在开发的一个 WPF .NET 4.5 应用程序,最初用于处理小数据量,现在可以处理 100 万甚至更多的大得多的数据量,当然我开始耗尽内存。数据来自 MS SQL DB,数据处理需要加载到本地数据结构,因为这些数据随后会被 CLR 中的代码转换/处理/引用,因此需要连续不间断的数据访问,但并非所有数据都具有立即加载到内存中,但只有在实际访问时才加载。作为一个小示例,反距离插值器使用此数据生成插值 map ,所有数据都需要传递给它以生成连续的网格。

我已经重写了应用程序的某些部分来处理数据,例如在任何给定时间只加载 x 行,并实现了一种有效的数据处理滑动窗口方法。然而,对应用程序的其余部分执行此操作将需要一些时间投入,我想知道是否可以有更稳健和标准的方法来解决这个设计问题(必须有,我不是第一个)?

tldr; C# 是否提供任何数据结构或技术以中断方式访问大量数据,因此它的行为类似于 IEnumerable 但数据在实际访问或需要之前不在内存中,还是完全由我来管理内存使用?我的理想是一种结构,它可以自动实现类似缓冲区的机制,并在访问该数据时加载更多数据,并从已访问但不再感兴趣的数据中释放内存。也许像一些带有内部缓冲区的 DataTable?

最佳答案

就遍历太大而无法放入内存的非常大的数据集而言,您可以使用生产者-消费者模型。当我处理包含数十亿条记录的自定义数据集时,我使用了类似的东西——总共大约 2 TB 的数据。

我们的想法是拥有一个包含生产者和消费者的类。当您创建该类的新实例时,它会启动一个生产者线程来填充受限并发队列。该线程使队列保持完整。消费者部分是让您获取下一条记录的 API。

您从一个共享的并发队列开始。我喜欢 .NET BlockingCollection为此。

下面是一个读取文本文件并维护一个包含 10,000 行文本的队列的示例。

public class TextFileLineBuffer
{
private const int QueueSize = 10000;
private BlockingCollection<string> _buffer = new BlockingCollection<string>(QueueSize);
private CancellationTokenSource _cancelToken;
private StreamReader reader;

public TextFileLineBuffer(string filename)
{
// File is opened here so that any exception is thrown on the calling thread.
_reader = new StreamReader(filename);
_cancelToken = new CancellationTokenSource();
// start task that reads the file
Task.Factory.StartNew(ProcessFile, TaskCreationOptions.LongRunning);
}

public string GetNextLine()
{
if (_buffer.IsCompleted)
{
// The buffer is empty because the file has been read
// and all lines returned.
// You can either call this an error and throw an exception,
// or you can return null.
return null;
}

// If there is a record in the buffer, it is returned immediately.
// Otherwise, Take does a non-busy wait.

// You might want to catch the OperationCancelledException here and return null
// rather than letting the exception escape.

return _buffer.Take(_cancelToken.Token);
}

private void ProcessFile()
{
while (!_reader.EndOfStream && !_cancelToken.Token.IsCancellationRequested)
{
var line = _reader.ReadLine();
try
{
// This will block if the buffer already contains QueueSize records.
// As soon as a space becomes available, this will add the record
// to the buffer.
_buffer.Add(line, _cancelToken.Token);
}
catch (OperationCancelledException)
{
;
}
}
_buffer.CompleteAdding();
}

public void Cancel()
{
_cancelToken.Cancel();
}
}

这就是它的基本原理。您需要添加一个 Dispose 方法来确保线程已终止并且文件已关闭。

我已经在许多不同的程序中使用了这种基本方法并取得了良好的效果。您必须进行一些分析和测试以确定适合您的应用程序的最佳缓冲区大小。您需要足够大的内存来跟上正常的数据流并处理突发的事件,但又不能大到超出您的内存预算。

IEnumerable 修改

如果要支持IEnumerable<T> , 你必须做一些小的修改。我将扩展我的示例以支持 IEnumerable<String> .

首先,您必须更改类声明:

public class TextFileLineBuffer: IEnumerable<string>

然后,你必须实现GetEnumerator :

public IEnumerator<String> GetEnumerator()
{
foreach (var s in _buffer.GetConsumingEnumerable())
{
yield return s;
}
}

IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}

这样,您就可以初始化这个东西,然后将它传递给任何需要 IEnumerable<string> 的代码。 .于是就变成了:

var items = new TextFileLineBuffer(filename);
DoSomething(items);

void DoSomething(IEnumerable<string> list)
{
foreach (var s in list)
Console.WriteLine(s);
}

关于c# - 用于处理大数据量(100 万条记录等)的数据结构和技术,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27162219/

56 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com