gpt4 book ai didi

c# - 使用线程(任务)做工作包含 I/O

转载 作者:太空宇宙 更新时间:2023-11-03 12:43:10 25 4
gpt4 key购买 nike

我需要从一个文件中读取数据,处理并将结果写入另一个文件。我使用 backgroundworker 来显示进程状态。我写了类似这样的东西用于 backgroundworker 的 DoWork 事件

private void ProcData(string fileToRead,string fileToWrite)
{
byte[] buffer = new byte[4 * 1024];

//fileToRead & fileToWrite have same size
FileInfo fileInfo = new FileInfo(fileToRead);

using (FileStream streamReader = new FileStream(fileToRead, FileMode.Open))
using (BinaryReader binaryReader = new BinaryReader(streamReader))
using (FileStream streamWriter = new FileStream(fileToWrite, FileMode.Open))
using (BinaryWriter binaryWriter = new BinaryWriter(streamWriter))
{
while (streamWriter.Position < fileInfo.Length)
{
if (streamWriter.Position + buffer.Length > fileInfo.Length)
{
buffer = new byte[fileInfo.Length - streamWriter.Position];
}

//read
buffer = binaryReader.ReadBytes(buffer.Length);

//proccess
Proc(buffer);

//write
binaryWriter.Write(buffer);

//report if procentage changed
//...

}//while
}//using
}

但它比仅从 fileToRead 读取和写入 fileToWrite 慢 5 倍,所以我考虑线程。我在网站上阅读了一些问题并尝试基于 this question 的类似问题

private void ProcData2(string fileToRead, string fileToWrite)
{
int threadNumber = 4; //for example

Task[] tasks = new Task[threadNumber];

long[] startByte = new long[threadNumber];
long[] length = new long[threadNumber];

//divide file to threadNumber(4) part
//and update startByte & length

var parentTask = Task.Run(() =>
{
for (int i = 0; i < threadNumber; i++)
{
tasks[i] = Task.Factory.StartNew(() =>
{
Proc2(fileToRead, fileToWrite, startByte[i], length[i]);
});
}
});

parentTask.Wait();

Task.WaitAll(tasks);

}
//
private void Proc2(string fileToRead,string fileToWrite,long fileStartByte,long partLength)
{
byte[] buffer = new byte[4 * 1024];

using (FileStream streamReader = new FileStream(fileToRead, FileMode.Open,FileAccess.Read,FileShare.Read))
using (BinaryReader binaryReader = new BinaryReader(streamReader))
using (FileStream streamWriter = new FileStream(fileToWrite, FileMode.Open,FileAccess.Write,FileShare.Write))
using (BinaryWriter binaryWriter = new BinaryWriter(streamWriter))
{
streamReader.Seek(fileStartByte, SeekOrigin.Begin);
streamWriter.Seek(fileStartByte, SeekOrigin.Begin);

while (streamWriter.Position < fileStartByte+partLength)
{
if (streamWriter.Position + buffer.Length > fileStartByte+partLength)
{
buffer = new byte[fileStartByte+partLength - streamWriter.Position];
}

//read
buffer = binaryReader.ReadBytes(buffer.Length);

//proccess
Proc(buffer);

//write
binaryWriter.Write(buffer);

//report if procentage changed
//...

}//while
}//using
}

但我认为它有一些问题,每次切换任务时都需要重新寻找。我考虑读取文件,对 Proc() 使用线程,然后写入结果,但这似乎是错误的。我怎样才能正确地做到这一点?(从文件中读取缓冲区,使用任务处理并将其写入其他文件)

//=========================================== ======================

根据 Pete Kirkham 的帖子,我修改了我的方法。我不知道为什么,但它对我不起作用。我为可能对他们有帮助的人添加了新方法。谢谢大家

 private void ProcData3(string fileToRead, string fileToWrite)
{
int bufferSize = 4 * 1024;
int threadNumber = 4;//example
List<byte[]> bufferPool = new List<byte[]>();
Task[] tasks = new Task[threadNumber];

//fileToRead & fileToWrite have same size
FileInfo fileInfo = new FileInfo(fileToRead);

using (FileStream streamReader = new FileStream(fileToRead, FileMode.Open))
using (BinaryReader binaryReader = new BinaryReader(streamReader))
using (FileStream streamWriter = new FileStream(fileToWrite, FileMode.Open))
using (BinaryWriter binaryWriter = new BinaryWriter(streamWriter))
{
while (streamWriter.Position < fileInfo.Length)
{
//read
for (int g = 0; g < threadNumber; g++)
{
if (streamWriter.Position + bufferSize <= fileInfo.Length)
{
bufferPool.Add(binaryReader.ReadBytes(bufferSize));
}
else
{
bufferPool.Add(binaryReader.ReadBytes((int)(fileInfo.Length - streamWriter.Position)));
break;
}
}

//do
var parentTask = Task.Run(() =>
{
for (int th = 0; th < bufferPool.Count; th++)
{
int index = th;

//threads
tasks[index] = Task.Factory.StartNew(() =>
{
Proc(bufferPool[index]);
});

}//for th
});

//stop parent task(run childs)
parentTask.Wait();

//wait till all task be done
Task.WaitAll(tasks);

//write
for (int g = 0; g < bufferPool.Count; g++)
{
binaryWriter.Write(bufferPool[g]);
}

//report if procentage changed
//...

}//while
}//using
}

最佳答案

本质上,您希望将数据处理拆分为并行任务,但不希望将 IO 拆分。

发生这种情况的方式取决于您的数据大小。如果它足够小以适合内存,那么你可以将它全部读入一个输入数组并创建一个输出数组,然后创建任务来处理一些输入数组并填充一些输出数组,然后写入整个输出数组归档。

如果数据太大,那么你需要限制一次读取和写入的数据量。因此,您的主要流程从读取 N 个数据 block 并创建 N 个任务来处理它们开始。然后,您等待任务按顺序完成,每次完成时,您写入输出 block 并读取新的输入 block 并创建另一个任务。需要进行一些实验以获得良好的 N 值和 block 大小,这意味着任务的完成速度往往与 IO 的工作速度大致相同。

关于c# - 使用线程(任务)做工作包含 I/O,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38218692/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com