- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我写了下面的方法来批处理一个巨大的 CSV 文件。这个想法是从文件中读取一大块行到内存中,然后将这些行 block 分成固定大小的批处理。获得分区后,将这些分区发送到服务器(同步或异步),这可能需要一段时间。
private static void BatchProcess(string filePath, int chunkSize, int batchSize)
{
List<string> chunk = new List<string>(chunkSize);
foreach (var line in File.ReadLines(filePath))
{
if (chunk.Count == chunk.Capacity)
{
// Partition each chunk into smaller chunks grouped on column 1
var partitions = chunk.GroupBy(c => c.Split(',')[0], (key, g) => g);
// Further breakdown the chunks into batch size groups
var groups = partitions.Select(x => x.Select((i, index) =>
new { i, index }).GroupBy(g => g.index / batchSize, e => e.i));
// Get batches from groups
var batches = groups.SelectMany(x => x)
.Select(y => y.Select(z => z)).ToList();
// Process all batches asynchronously
batches.AsParallel().ForAll(async b =>
{
WebClient client = new WebClient();
byte[] bytes = System.Text.Encoding.ASCII
.GetBytes(b.SelectMany(x => x).ToString());
await client.UploadDataTaskAsync("myserver.com", bytes);
});
// clear the chunk
chunk.Clear();
}
chunk.Add(line);
}
}
这段代码似乎不是很高效,原因有两个。
从 CSV 文件读取的主线程被阻塞,直到所有的分区都被处理完。
AsParallel 会阻塞直到所有任务完成。因此,如果线程池中有更多线程可用于工作,我不会使用它们,因为任务数不受分区数的限制。
batchSize 是固定的,因此无法更改,但 chunkSize 可以针对性能进行调整。我可以选择足够大的 chunkSize,这样就不会创建任何批处理 >> 系统中没有可用的线程,但这仍然意味着 Parallel.ForEach 方法会阻塞,直到所有任务完成。
我如何更改代码,以便利用系统中的所有可用线程来完成工作而无需闲置。我在想我可以使用 BlockingCollection 来存储批处理,但不确定要给它多大的容量,因为每个 block 中没有批处理是动态的。
关于如何使用 TPL 最大化线程利用率以便系统上的大多数可用线程始终在做某事有什么想法吗?
更新:这是我到目前为止使用 TPL 数据流得到的结果。这是正确的吗?
private static void UploadData(string filePath, int chunkSize, int batchSize)
{
var buffer1 = new BatchBlock<string>(chunkSize);
var buffer2 = new BufferBlock<IEnumerable<string>>();
var action1 = new ActionBlock<string[]>(t =>
{
Console.WriteLine("Got a chunk of lines " + t.Count());
// Partition each chunk into smaller chunks grouped on column 1
var partitions = t.GroupBy(c => c.Split(',')[0], (key, g) => g);
// Further breakdown the chunks into batch size groups
var groups = partitions.Select(x => x.Select((i, index) =>
new { i, index }).GroupBy(g => g.index / batchSize, e => e.i));
// Get batches from groups
var batches = groups.SelectMany(x => x).Select(y => y.Select(z => z));
foreach (var batch in batches)
{
buffer2.Post(batch);
}
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
buffer1.LinkTo(action1, new DataflowLinkOptions
{ PropagateCompletion = true });
var action2 = new TransformBlock<IEnumerable<string>,
IEnumerable<string>>(async b =>
{
await ExecuteBatch(b);
return b;
}, new ExecutionDataflowBlockOptions
{ MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
buffer2.LinkTo(action2, new DataflowLinkOptions
{ PropagateCompletion = true });
var action3 = new ActionBlock<IEnumerable<string>>(b =>
{
Console.WriteLine("Finised executing a batch");
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
action2.LinkTo(action3, new DataflowLinkOptions
{ PropagateCompletion = true });
Task produceTask = Task.Factory.StartNew(() =>
{
foreach (var line in File.ReadLines(filePath))
{
buffer1.Post(line);
}
//Once marked complete your entire data flow will signal a stop for
// all new items
Console.WriteLine("Finished producing");
buffer1.Complete();
});
Task.WaitAll(produceTask);
Console.WriteLine("Produced complete");
action1.Completion.Wait();//Will add all the items to buffer2
Console.WriteLine("Action1 complete");
buffer2.Complete();//will not get any new items
action2.Completion.Wait();//Process the batch of 5 and then complete
Task.Wait(action3.Completion);
Console.WriteLine("Process complete");
Console.ReadLine();
}
最佳答案
你很接近,在 TPL 中,数据从一个 block 流到另一个 block ,你应该尽量保持这种范式。因此,例如 action1 应该是一个 TransformManyBlock,因为 ActionBlock
是一个 ITargetBlock
(即终止 block )。
当您在链接上指定传播完成时,完成事件会自动路由到 block 中,因此您只需在最后一个 block 上执行一个 wait()。
将 is 想象成一个多米诺骨牌链,您在第一个 block 上调用 complete,它将通过链传播到最后一个 block 。
您还应该考虑什么是多线程以及为什么要使用多线程;您的示例严重受 I/O 限制,我认为绑定(bind)一堆线程来等待 I/O 完成不是正确的解决方案。
最后,请注意阻塞与否。在您的示例中,buffer1.Post(...)
不是阻塞调用,您没有理由在任务中使用它。
我编写了以下使用 TPL DataFlow 的示例代码:
static void Main(string[] args)
{
var filePath = "C:\\test.csv";
var chunkSize = 1024;
var batchSize = 128;
var linkCompletion = new DataflowLinkOptions
{
PropagateCompletion = true
};
var uploadData = new ActionBlock<IEnumerable<string>>(
async (data) =>
{
WebClient client = new WebClient();
var payload = data.SelectMany(x => x).ToArray();
byte[] bytes = System.Text.Encoding.ASCII.GetBytes(payload);
//await client.UploadDataTaskAsync("myserver.com", bytes);
await Task.Delay(2000);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded /* Prefer to limit that to some reasonable value */ });
var lineBuffer = new BatchBlock<string>(chunkSize);
var splitData = new TransformManyBlock<IEnumerable<string>, IEnumerable<string>>(
(data) =>
{
// Partition each chunk into smaller chunks grouped on column 1
var partitions = data.GroupBy(c => c.Split(',')[0]);
// Further beakdown the chunks into batch size groups
var groups = partitions.Select(x => x.Select((i, index) => new { i, index }).GroupBy(g => g.index / batchSize, e => e.i));
// Get batches from groups
var batches = groups.SelectMany(x => x).Select(y => y.Select(z => z));
// Don't forget to enumerate before returning
return batches.ToList();
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
lineBuffer.LinkTo(splitData, linkCompletion);
splitData.LinkTo(uploadData, linkCompletion);
foreach (var line in File.ReadLines(filePath))
{
lineBuffer.Post(line);
}
lineBuffer.Complete();
// Wait for uploads to finish
uploadData.Completion.Wait();
}
关于c# - 如何在 TPL 数据流中执行异步操作以获得最佳性能?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32031783/
在这段令人惊叹的视频 ( https://www.youtube.com/watch?v=udix3GZouik ) 中,Alex Blom 谈到了 Ember 在移动世界中的“黑客攻击”。 在 22
我们希望通过我们的应用收集使用情况统计信息。因此,我们希望在服务器端的某个地方跟踪用户操作。 就性能而言,哪个选项更合适: 在 App Engine 请求日志中跟踪用户操作。即为每个用户操作写入一个日
在针对对象集合的 LINQ 查询的幕后究竟发生了什么?它只是语法糖还是发生了其他事情使其更有效的查询? 最佳答案 您是指查询表达式,还是查询在幕后的作用? 查询表达式首先扩展为“普通”C#。例如: v
我正在构建一个简单的照片库应用程序,它在列表框中显示图像。 xaml 是:
对于基于 Web 的企业应用程序,使用“静态 Hashmap 存储对象” 和 apache java 缓存系统有何优缺点?哪一个最有利于性能并减少堆内存问题 例如: Map store=Applica
我想知道在性能方面存储类变量的最佳方式是什么。我的意思是,由于 Children() 函数,存储一个 div id 比查找所有其他类名更好。还是把类名写在变量里比较好? 例如这样: var $inne
我已经阅读了所有这些关于 cassandra 有多快的文章,例如单行读取可能需要大约 5 毫秒。 到目前为止,我不太关心我的网站速度,但是随着网站变得越来越大,一些页面开始需要相当多的查询,例如一个页
最近,我在缓存到内存缓存之前的查询一直需要很长时间才能处理!在这个例子中,它花费了 10 秒。在这种情况下,我要做的就是获得 10 个最近的点击。 我感觉它加载了所有 125,592 行然后只返回 1
我找了几篇文章(包括SA中的一些问题),试图找到基本操作的成本。 但是,我尝试制作自己的小程序,以便自己进行测试。在尝试测试加法和减法时,我遇到了一些问题,我用简单的代码向您展示了这一点
这个问题在这里已经有了答案: Will Java app slow down by presence of -Xdebug or only when stepping through code? (
我记得很久以前读过 with() 对 JavaScript 有一些严重的性能影响,因为它可能对范围堆栈进行非确定性更改。我很难找到最近对此的讨论。这仍然是真的吗? 最佳答案 与其说 with 对性能有
我们有一个数据仓库,其中包含非规范化表,行数从 50 万行到 6 多万行不等。我正在开发一个报告解决方案,因此出于性能原因我们正在使用数据库分页。我们的报告有搜索条件,并且我们已经创建了必要的索引,但
我有一条有效的 SQL 语句,但需要很长时间才能处理 我有一个 a_log 表和一个 people 表。我需要在 people 表中找到给定人员的每个 ID 的最后一个事件和关联的用户。 SELECT
很难说出这里问的是什么。这个问题是含糊的、模糊的、不完整的、过于宽泛的或修辞性的,无法以目前的形式得到合理的回答。如需帮助澄清此问题以便重新打开它,visit the help center 。 已关
通常当我建立一个站点时,我将所有的 CSS 放在一个文件中,并且一次性定义与一组元素相关的所有属性。像这样: #myElement { color: #fff; background-
两者之间是否存在任何性能差异: p { margin:0px; padding:0px; } 并省略最后的分号: p { margin:0px; padding:0px } 提前致谢!
我的应用程序 (PHP) 需要执行大量高精度数学运算(甚至可能出现一共100个数字) 通过这个论坛的最后几篇帖子,我发现我必须使用任何高精度库,如 BC Math 或 GMP,因为 float 类型不
我一直在使用 javamail 从 IMAP 服务器(目前是 GMail)检索邮件。 Javamail 非常快速地从服务器检索特定文件夹中的消息列表(仅 id),但是当我实际获取消息(仅包含甚至不包含
我非常渴望开发我的第一个 Ruby 应用程序,因为我的公司终于在内部批准了它的使用。 在我读到的关于 Ruby v1.8 之前的所有内容中,从来没有任何关于性能的正面评价,但我没有发现关于 1.9 版
我是 Redis 的新手,我有一个包含数百万个成员(member) ID、电子邮件和用户名的数据集,并且正在考虑将它们存储在例如列表结构中。我认为 list 和 sorted set 可能最适合我的情
我是一名优秀的程序员,十分优秀!