- r - 以节省内存的方式增长 data.frame
- ruby-on-rails - ruby/ruby on rails 内存泄漏检测
- android - 无法解析导入android.support.v7.app
- UNIX 域套接字与共享内存(映射文件)
我正在构建一个必须处理大量数据的控制台应用程序。
基本上,应用程序从数据库中获取引用。对于每个引用,解析文件的内容并进行一些更改。这些文件是 HTML 文件,并且该过程正在使用 RegEx 替换做繁重的工作(查找引用并将它们转换为链接)。然后将结果存储在文件系统中并发送到外部系统。
如果我按顺序继续该过程:
var refs = GetReferencesFromDB(); // ~5000 Datarow returned
foreach(var ref in refs)
{
var filePath = GetFilePath(ref); // This method looks up in a previously loaded file list
var html = File.ReadAllText(filePath); // Read html locally, or from a network drive
var convertedHtml = ParseHtml(html);
File.WriteAllText(destinationFilePath); // Copy the result locally, or a network drive
SendToWs(ref, convertedHtml);
}
我的程序运行正常,但速度很慢。这就是为什么我想并行处理这个过程。
到现在为止,我做了一个简单的并行化添加 AsParallel :
var refs = GetReferencesFromDB().AsParallel();
refs.ForAll(ref=>
{
var filePath = GetFilePath(ref);
var html = File.ReadAllText(filePath);
var convertedHtml = ParseHtml(html);
File.WriteAllText(destinationFilePath);
SendToWs(ref, convertedHtml);
});
这个简单的改变减少了过程的持续时间(减少 25% 的时间)。但是,我对并行化的理解是,如果对依赖 I/O 的资源进行并行化,不会有太多好处(或者更糟,好处更少),因为 I/O 不会神奇地翻倍。
这就是为什么我认为我应该改变我的方法,而不是将整个过程并行化,而是创建依赖链式排队任务。
即,我应该创建如下流程:
Queue read file. When finished, Queue ParseHtml. When finished, Queue both send to WS and write locally. When finished, log the result.
但是,我不知道如何实现这样的想法。
我觉得它会以一组消费者/生产者队列结束,但我没有找到正确的样本。
而且,我不确定是否会有好处。
多谢指教
[编辑] 事实上,我是使用 c# 4.5 的完美人选...如果它是 rtm :)
[编辑 2] 另一件让我认为它没有正确并行化的事情是,在资源监视器中,我看到 CPU、网络 I/O 和磁盘 I/O 的图表不稳定。一个高,另一个低到中
最佳答案
您没有在任何代码中利用任何异步 I/O API。您所做的一切都受 CPU 限制,您所有的 I/O 操作都将浪费 CPU 资源阻塞。 AsParallel
用于计算绑定(bind)任务,如果您想利用异步 I/O,则需要在 <= v4.0 中利用基于异步编程模型 (APM) 的 API。这是通过在您正在使用的基于 I/O 的类上查找 BeginXXX/EndXXX
方法并在可用时利用这些方法来完成的。
初学者请阅读这篇文章:TPL TaskFactory.FromAsync vs Tasks with blocking methods
接下来,无论如何您都不想在这种情况下使用AsParallel
。 AsParallel
启用流式传输,这将导致立即为每个项目安排一个新任务,但您在这里不需要/不想要它。使用 Parallel::ForEach
划分工作会更好。
让我们看看如何使用这些知识在您的特定情况下实现最大并发性:
var refs = GetReferencesFromDB();
// Using Parallel::ForEach here will partition and process your data on separate worker threads
Parallel.ForEach(
refs,
ref =>
{
string filePath = GetFilePath(ref);
byte[] fileDataBuffer = new byte[1048576];
// Need to use FileStream API directly so we can enable async I/O
FileStream sourceFileStream = new FileStream(
filePath,
FileMode.Open,
FileAccess.Read,
FileShare.Read,
8192,
true);
// Use FromAsync to read the data from the file
Task<int> readSourceFileStreamTask = Task.Factory.FromAsync(
sourceFileStream.BeginRead
sourceFileStream.EndRead
fileDataBuffer,
fileDataBuffer.Length,
null);
// Add a continuation that will fire when the async read is completed
readSourceFileStreamTask.ContinueWith(readSourceFileStreamAntecedent =>
{
int soureFileStreamBytesRead;
try
{
// Determine exactly how many bytes were read
// NOTE: this will propagate any potential exception that may have occurred in EndRead
sourceFileStreamBytesRead = readSourceFileStreamAntecedent.Result;
}
finally
{
// Always clean up the source stream
sourceFileStream.Close();
sourceFileStream = null;
}
// This is here to make sure you don't end up trying to read files larger than this sample code can handle
if(sourceFileStreamBytesRead == fileDataBuffer.Length)
{
throw new NotSupportedException("You need to implement reading files larger than 1MB. :P");
}
// Convert the file data to a string
string html = Encoding.UTF8.GetString(fileDataBuffer, 0, sourceFileStreamBytesRead);
// Parse the HTML
string convertedHtml = ParseHtml(html);
// This is here to make sure you don't end up trying to write files larger than this sample code can handle
if(Encoding.UTF8.GetByteCount > fileDataBuffer.Length)
{
throw new NotSupportedException("You need to implement writing files larger than 1MB. :P");
}
// Convert the file data back to bytes for writing
Encoding.UTF8.GetBytes(convertedHtml, 0, convertedHtml.Length, fileDataBuffer, 0);
// Need to use FileStream API directly so we can enable async I/O
FileStream destinationFileStream = new FileStream(
destinationFilePath,
FileMode.OpenOrCreate,
FileAccess.Write,
FileShare.None,
8192,
true);
// Use FromAsync to read the data from the file
Task destinationFileStreamWriteTask = Task.Factory.FromAsync(
destinationFileStream.BeginWrite,
destinationFileStream.EndWrite,
fileDataBuffer,
0,
fileDataBuffer.Length,
null);
// Add a continuation that will fire when the async write is completed
destinationFileStreamWriteTask.ContinueWith(destinationFileStreamWriteAntecedent =>
{
try
{
// NOTE: we call wait here to observe any potential exceptions that might have occurred in EndWrite
destinationFileStreamWriteAntecedent.Wait();
}
finally
{
// Always close the destination file stream
destinationFileStream.Close();
destinationFileStream = null;
}
},
TaskContinuationOptions.AttachedToParent);
// Send to external system **concurrent** to writing to destination file system above
SendToWs(ref, convertedHtml);
},
TaskContinuationOptions.AttachedToParent);
});
现在,这里有一些注意事项:
TaskContinuationOptions.AttachedToParent
。这非常重要,因为它将阻止 Parallel::ForEach
开始工作的工作线程在所有底层异步调用完成之前完成。如果这不是这里,您将同时开始所有 5000 个项目的工作,这将用数千个计划任务污染 TPL 子系统并且根本无法正确扩展。关于c# - 如何正确并行化严重依赖 I/O 的作业,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8505815/
我在 gobject 上阅读了一个维基百科页面,上面写着, Depending only on GLib and libc, GObject is a cornerstone of GNOME and
如何注册一个依赖属性,其值是使用另一个依赖属性的值计算的? 由于 .NET 属性包装器在运行时被 WPF 绕过,因此不应在 getter 和 setter 中包含逻辑。解决方案通常是使用 Proper
我一直在尝试将 ActionbarSherlock maven 依赖项添加到我的项目中 com.actionbarsherlock library 4.2.0 在我的 po
http://tutorials.jenkov.com/ood/understanding-dependencies.html#whatis说(强调我的): Whenever a class A us
我对所有这些魔法有点不清楚。 据我了解,依赖属性是从 DependencyObject 继承的,因此存储值: 如果分配了值(在本地字典中),则在实例本身中 或者如果未指定值,则从指向父元素的链接中获取
我刚刚更新了在 ASP.NET Framework 4.5.2 版上运行的 MVC Web 应用程序。我正在使用 Twilio 发送 SMS 消息: var twilio = new TwilioRe
我刚刚发现了一件令人生畏的事情。 spring 依赖坐标有两个版本。 项目依赖于 spring mvc 和 spring flow。有两组并行的依赖项。 Spring MVC 具有以下方案的依赖项
我正在尝试包含 的 maven 依赖项 org.jacorb jacorb 2.3.1 依赖已解决,但它导致另一个依赖 picocontainer 出现问题: [ERROR
我正在尝试在 Haskell 项目中包含特定版本的库。该库是住宿加早餐型的(用于 martix 操作),但我需要特定的 0.4.3 版本,该版本修复了乘法实现的错误。 所以,我的 stack.yaml
有谁知道如何制作依赖的 UIPickerView.例如,当我选择组件一的第 2 行时,组件二的标题会发生变化吗? 我在互联网上查找过,没有真正的答案,我尝试过使用 if 和 switch 语句,但它们
我正在编写一个用于验收测试的项目,由于各种原因,这依赖于另一个打包为 WAR 的项目。我已成功使用 maven-dependency-plugin 解压 WAR,但无法让我的项目包含解压的 WEB-I
或多或少我在 session 上大量构建我的网站(特别是重定向用户等),我很好奇这是否是一种危险的做法。禁用浏览器 cookie 保存的用户的大致比例是多少?我愿意接受任何建议:) 谢谢 最佳答案 s
开始玩 Scala futures,我被依赖的 futures 困住了。 让我们举个例子。我搜索地点并获得 Future[Seq[Place]]。对于这些地点中的每一个,我搜索最近的地铁站(该服务返回
或多或少我在 session 上大量构建我的网站(特别是重定向用户等),我很好奇这是否是一种危险的做法。禁用浏览器 cookie 保存的用户的大致比例是多少?我愿意接受任何建议:) 谢谢 最佳答案 s
我有一个二进制文件,需要一些 *.so 文件才能执行。现在,当我尝试在一些旧机器上执行它时,它会显示 /lib/libc.so.6: version `GLIBC_2.4' not found 如何将
我尝试使用 Dygraph 来表示图表,我在 https://github.com/danvk/dygraphs 中找到了代码,但是它有太多的依赖文件,我觉得很烦人。是否有一个文件可以容纳所有必需的
我正在处理一个 javascript 文件,该文件 a) 声明一个具有函数的对象,并且 b) 使用它期望在外部声明的散列调用该对象的 init 函数。我的 Jasmine 规范提示它找不到哈希,因为它
最近我一直在学习 Angular 并且进展顺利,但是关于依赖注入(inject)的一些事情我仍然不清楚。 是否有任何理由在我的 app.js 文件中声明我的应用程序的其他部分(服务、 Controll
考虑一个名为 foo 的表,它有 id (PRIMARY & AUTO_INCREMENT) 列。我正在向该表中插入一行,挑战从此时开始。 $db->query("INSERT INTO `foo`
我正在使用级联下拉 jquery 插件。 (https://github.com/dnasir/jquery-cascading-dropdown) 我有两个下拉菜单。 “客户端”和“站点”。 根据您
我是一名优秀的程序员,十分优秀!