gpt4 book ai didi

c# - 具有大量文件IO任务的多线程

转载 作者:太空狗 更新时间:2023-10-30 01:17:08 28 4
gpt4 key购买 nike

我对 C# 并不是完全陌生,但我对这门语言还不够熟悉,不知道如何做我需要做的事情。

我有一个文件,将其命名为 File1.txt。 File1.txt 有 100,000 行左右。我将复制 File1.txt 并将其命名为 File1_untested.txt。我还将创建一个空文件“Successes.txt”对于文件中的每一行:

  • 从 File1_untested.txt 中删除这一行
  • 如果此行通过测试,将其写入Successes.txt

所以,我的问题是,我该如何多线程处理它?<​​/p>

到目前为止,我的方法是创建一个对象 (LineChecker),为对象提供要检查的行,然后将对象传递到 ThreadPool。我了解如何将 ThreadPools 用于一些带有 CountdownEvent 的任务。但是,一下子排队10万个任务,似乎不太合理。我怎样才能逐渐喂水池?也许一次 1000 行或类似的东西。

另外,我需要确保没有两个线程同时添加到 Successes.txt 或从 File1_untested.txt 中删除。我可以用 lock() 处理这个,对吧?我应该将什么传递给 lock()?我可以使用 LineChecker 的静态成员吗​​?

我只是想对如何设计这样的东西有一个广泛的了解。

最佳答案

由于测试需要花费相对较多的时间,因此使用多个 CPU 内核是有意义的。但是,这种利用应该只用于相对昂贵的测试,而不是用于读取/更新文件。这是因为读取/更新文件相对便宜。

下面是一些您可以使用的示例代码:

假设您有一个相对昂贵的测试方法:

private bool Test(string line)
{
//This test is expensive
}

这是一个可以利用多个 CPU 进行测试的代码示例:

这里我们将集合中的项目数限制为 10,以便从文件中读取的线程将等待其他线程 catch 进度,然后再从文件中读取更多行。

这个输入线程的读取速度比其他线程的测试速度快得多,所以在最坏的情况下,我们读取的行数将比测试线程完成测试的行数多 10 行。这确保我们有良好的内存消耗。

CancellationTokenSource cancellation_token_source = new CancellationTokenSource();

CancellationToken cancellation_token = cancellation_token_source.Token;

BlockingCollection<string> blocking_collection = new BlockingCollection<string>(10);

using (StreamReader reader = new StreamReader(new FileStream(filename, FileMode.Open, FileAccess.Read)))
{
using (
StreamWriter writer =
new StreamWriter(new FileStream(success_filename, FileMode.OpenOrCreate, FileAccess.Write)))
{

var input_task = Task.Factory.StartNew(() =>
{
try
{
while (!reader.EndOfStream)
{
if (cancellation_token.IsCancellationRequested)
return;

blocking_collection.Add(reader.ReadLine());
}
}
finally //In all cases, even in the case of an exception, we need to make sure that we mark that we have done adding to the collection so that the Parallel.ForEach loop will exit. Note that Parallel.ForEach will not exit until we call CompleteAdding
{
blocking_collection.CompleteAdding();
}
});


try
{
Parallel.ForEach(blocking_collection.GetConsumingEnumerable(), (line) =>
{
bool test_reault = Test(line);


if (test_reault)
{
lock (writer)
{
writer.WriteLine(line);
}
}
});
}
catch
{
cancellation_token_source.Cancel(); //If Paralle.ForEach throws an exception, we inform the input thread to stop
throw;
}

input_task.Wait(); //This will make sure that exceptions thrown in the input thread will be propagated here
}
}

关于c# - 具有大量文件IO任务的多线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32801639/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com