gpt4 book ai didi

c# - 如何正确并行化严重依赖 I/O 的作业

转载 作者:IT王子 更新时间:2023-10-29 04:30:54 24 4
gpt4 key购买 nike

我正在构建一个必须处理大量数据的控制台应用程序。

基本上,应用程序从数据库中获取引用。对于每个引用,解析文件的内容并进行一些更改。这些文件是 HTML 文件,并且该过程正在使用 RegEx 替换做繁重的工作(查找引用并将它们转换为链接)。然后将结果存储在文件系统中并发送到外部系统。

如果我按顺序继续该过程:

var refs = GetReferencesFromDB(); // ~5000 Datarow returned
foreach(var ref in refs)
{
var filePath = GetFilePath(ref); // This method looks up in a previously loaded file list
var html = File.ReadAllText(filePath); // Read html locally, or from a network drive
var convertedHtml = ParseHtml(html);
File.WriteAllText(destinationFilePath); // Copy the result locally, or a network drive
SendToWs(ref, convertedHtml);
}

我的程序运行正常,但速度很慢。这就是为什么我想并行处理这个过程。

到现在为止,我做了一个简单的并行化添加 AsParallel :

var refs = GetReferencesFromDB().AsParallel(); 
refs.ForAll(ref=>
{
var filePath = GetFilePath(ref);
var html = File.ReadAllText(filePath);
var convertedHtml = ParseHtml(html);
File.WriteAllText(destinationFilePath);
SendToWs(ref, convertedHtml);
});

这个简单的改变减少了过程的持续时间(减少 25% 的时间)。但是,我对并行化的理解是,如果对依赖 I/O 的资源进行并行化,不会有太多好处(或者更糟,好处更少),因为 I/O 不会神奇地翻倍。

这就是为什么我认为我应该改变我的方法,而不是将整个过程并行化,而是创建依赖链式排队任务。

即,我应该创建如下流程:

Queue read file. When finished, Queue ParseHtml. When finished, Queue both send to WS and write locally. When finished, log the result.

但是,我不知道如何实现这样的想法。

我觉得它会以一组消费者/生产者队列结束,但我没有找到正确的样本。

而且,我不确定是否会有好处。

多谢指教

[编辑] 事实上,我是使用 c# 4.5 的完美人选...如果它是 rtm :)

[编辑 2] 另一件让我认为它没有正确并行化的事情是,在资源监视器中,我看到 CPU、网络 I/O 和磁盘 I/O 的图表不稳定。一个高,另一个低到中

最佳答案

您没有在任何代码中利用任何异步 I/O API。您所做的一切都受 CPU 限制,您所有的 I/O 操作都将浪费 CPU 资源阻塞。 AsParallel 用于计算绑定(bind)任务,如果您想利用异步 I/O,则需要在 <= v4.0 中利用基于异步编程模型 (APM) 的 API。这是通过在您正在使用的基于 I/O 的类上查找 BeginXXX/EndXXX 方法并在可用时利用这些方法来完成的。

初学者请阅读这篇文章:TPL TaskFactory.FromAsync vs Tasks with blocking methods

接下来,无论如何您都不想在这种情况下使用AsParallelAsParallel 启用流式传输,这将导致立即为每个项目安排一个新任务,但您在这里不需要/不想要它。使用 Parallel::ForEach 划分工作会更好。

让我们看看如何使用这些知识在您的特定情况下实现最大并发性:

var refs = GetReferencesFromDB();

// Using Parallel::ForEach here will partition and process your data on separate worker threads
Parallel.ForEach(
refs,
ref =>
{
string filePath = GetFilePath(ref);

byte[] fileDataBuffer = new byte[1048576];

// Need to use FileStream API directly so we can enable async I/O
FileStream sourceFileStream = new FileStream(
filePath,
FileMode.Open,
FileAccess.Read,
FileShare.Read,
8192,
true);

// Use FromAsync to read the data from the file
Task<int> readSourceFileStreamTask = Task.Factory.FromAsync(
sourceFileStream.BeginRead
sourceFileStream.EndRead
fileDataBuffer,
fileDataBuffer.Length,
null);

// Add a continuation that will fire when the async read is completed
readSourceFileStreamTask.ContinueWith(readSourceFileStreamAntecedent =>
{
int soureFileStreamBytesRead;

try
{
// Determine exactly how many bytes were read
// NOTE: this will propagate any potential exception that may have occurred in EndRead
sourceFileStreamBytesRead = readSourceFileStreamAntecedent.Result;
}
finally
{
// Always clean up the source stream
sourceFileStream.Close();
sourceFileStream = null;
}

// This is here to make sure you don't end up trying to read files larger than this sample code can handle
if(sourceFileStreamBytesRead == fileDataBuffer.Length)
{
throw new NotSupportedException("You need to implement reading files larger than 1MB. :P");
}

// Convert the file data to a string
string html = Encoding.UTF8.GetString(fileDataBuffer, 0, sourceFileStreamBytesRead);

// Parse the HTML
string convertedHtml = ParseHtml(html);

// This is here to make sure you don't end up trying to write files larger than this sample code can handle
if(Encoding.UTF8.GetByteCount > fileDataBuffer.Length)
{
throw new NotSupportedException("You need to implement writing files larger than 1MB. :P");
}

// Convert the file data back to bytes for writing
Encoding.UTF8.GetBytes(convertedHtml, 0, convertedHtml.Length, fileDataBuffer, 0);

// Need to use FileStream API directly so we can enable async I/O
FileStream destinationFileStream = new FileStream(
destinationFilePath,
FileMode.OpenOrCreate,
FileAccess.Write,
FileShare.None,
8192,
true);

// Use FromAsync to read the data from the file
Task destinationFileStreamWriteTask = Task.Factory.FromAsync(
destinationFileStream.BeginWrite,
destinationFileStream.EndWrite,
fileDataBuffer,
0,
fileDataBuffer.Length,
null);

// Add a continuation that will fire when the async write is completed
destinationFileStreamWriteTask.ContinueWith(destinationFileStreamWriteAntecedent =>
{
try
{
// NOTE: we call wait here to observe any potential exceptions that might have occurred in EndWrite
destinationFileStreamWriteAntecedent.Wait();
}
finally
{
// Always close the destination file stream
destinationFileStream.Close();
destinationFileStream = null;
}
},
TaskContinuationOptions.AttachedToParent);

// Send to external system **concurrent** to writing to destination file system above
SendToWs(ref, convertedHtml);
},
TaskContinuationOptions.AttachedToParent);
});

现在,这里有一些注意事项:

  1. 这是示例代码,因此我使用 1MB 的缓冲区来读/写文件。这对于 HTML 文件来说是过多的并且浪费系统资源。您可以降低它以满足您的最大需求,或者将链式读/写实现到 StringBuilder 中,这是我留给您的练习,因为我将编写 ~500 多行代码来执行异步链式读/写。 :P
  2. 您会注意到,在读/写任务的延续上,我有 TaskContinuationOptions.AttachedToParent。这非常重要,因为它将阻止 Parallel::ForEach 开始工作的工作线程在所有底层异步调用完成之前完成。如果这不是这里,您将同时开始所有 5000 个项目的工作,这将用数千个计划任务污染 TPL 子系统并且根本无法正确扩展。
  3. 我并发调用 SendToWs 将文件写入此处的文件共享。我不知道 SendToWs 实现的基础是什么,但它听起来也很适合制作异步。现在假定它是纯计算工作,因此,将在执行时消耗 CPU 线程。我将其作为练习留给您,以弄清楚如何最好地利用我向您展示的内容来提高那里的吞吐量。
  4. 这是所有类型的自由形式,我的大脑是这里唯一的编译器,SO 的语法高亮是我用来确保语法良好的全部。所以,请原谅任何语法错误,如果我把任何事情搞砸得太严重以至于你无法理解它,请告诉我,我会跟进。

关于c# - 如何正确并行化严重依赖 I/O 的作业,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8505815/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com