gpt4 book ai didi

c# - Parallel.ForEach 最后没有线程

转载 作者:行者123 更新时间:2023-11-30 17:12:32 25 4
gpt4 key购买 nike

我正在测试一个应该编译很多项目/文件的应用程序。

我有一个 ConucrrentBag,它应该与 Parallel 一起工作。

private readonly ConcurrentBag<string> m_files;

我对并行的呼吁是这样的:

Parallel.ForEach(m_files, new ParallelOptions
{
MaxDegreeOfParallelism = MaxProcesses,

}, currFile => ProcessSingle(currFile.ToString()));

MaxProcess 的数量为 LogicalCpu*2。

当我编译 140 个项目时,Parallel 将启动 linear less threads 到最后。至少最近 4 个项目只有一个线程在运行。这不太好,但没关系。

现在我的问题:

当我编译大约 14000 多个项目(它是 COBOL-SOURCE ;-) 和一个非常大的系统时)最后一个模块不会被编译,因为 Parallel.ForEach 没有为此启动新线程。此时没有事件的工作线程。但是 concurrentBag 中仍然有 140 个项目。

有人知道如何解决这个问题吗?

编辑: 该问题仅在我运行编译器时出现。没有运行编译器(为了更快的测试)它工作正常......

编辑:

当我启动 Parallel.ForEach 进程时,ConcurrentBag 已经完全填满。

详细信息,SingleProcess中的代码:

private void ProcessSingle(string item)
{
Monitor.Enter(lockingObj);
if (m_files.TryTake(out item))
{
if (CompilingModules <= 0)
{
OnQueueStarted(new EventArgs());
}
CompilingModules++;
Monitor.Exit(lockingObj);
OnQueueItemStateChanged(new ItemQueueEventArgs(item, null, ItemQueueType.Done, ItemQueueObject.String));

OnQueueItemStateChanged(new ItemQueueEventArgs(item, null, ItemQueueType.Dequeued, ItemQueueObject.String));
using (CobolCompiler compiler = new CobolCompiler())
{
compiler.OutputDataReceived += (sender, e) => OnOutputDataReceived(e);
compiler.Compile(item);
Thread.Sleep(2000);
if (compiler.LinkFailure)
{
if (ObjWithoutDll.ContainsKey(item))
{
if (ObjWithoutDll[item] <= 2)
{
m_files.Add(item);
OnQueueItemStateChanged(new ItemQueueEventArgs(item, null, ItemQueueType.Enqueued, ItemQueueObject.String));
ObjWithoutDll[item]++;
}
else
{
OnQueueItemStateChanged(new ItemQueueEventArgs(item, null, ItemQueueType.LinkError, ItemQueueObject.String));
ObjWithoutDll.Remove(item);
}
}
else
{
ObjWithoutDll.Add(item, 0);
m_files.Add(item);
OnQueueItemStateChanged(new ItemQueueEventArgs(item, null, ItemQueueType.Enqueued, ItemQueueObject.String));
}
}
else
{
if (compiler.DllExisting)
{
ObjWithoutDll.Remove(item);
}
OnQueueItemStateChanged(compiler.DllExisting ? new ItemQueueEventArgs(item, null, ItemQueueType.Done, ItemQueueObject.String) : new ItemQueueEventArgs(item, null, ItemQueueType.Failed, ItemQueueObject.String));
}

}

Monitor.Enter(lockingObj);
CompiledModules++;
if (CompiledModules % 300 == 0)
{
Thread.Sleep(60000);
}
CompilingModules--;
if (CompilingModules <= 0 && m_files.Count <= 0)
{

try
{
Process prReschk = new Process();
FileInfo batch = new FileInfo(@"batches\reschkdlg.cmd");
if (!batch.Exists)
{
Assembly _assembly = Assembly.GetExecutingAssembly();
StreamReader _textStreamReader = new StreamReader(_assembly.GetManifestResourceStream(@"Batches\reschkdlg.cmd"));
}

if (!File.Exists(Config.Instance.WorkingDir + @"reschkdlg.exe"))
{
File.Copy(Config.Instance.VersionExeDirectory + @"reschkdlg.exe", Config.Instance.WorkingDir + @"reschkdlg.exe");
}

prReschk.StartInfo.FileName = @"cmd.exe";
prReschk.StartInfo.Arguments = @"/c " + batch.FullName + " " + Config.Instance.Version.Replace(".", "") + " " + @"*" + " " + Config.Instance.WorkingDir;
prReschk.StartInfo.CreateNoWindow = true;
prReschk.StartInfo.UseShellExecute = false;
prReschk.Start();
prReschk.Close();
prReschk.Dispose();
}
catch
{
}

OnQueueFinished(new EventArgs());
}
}
Monitor.Exit(lockingObj);
}

这里是 CobolCompiler 类的代码片段:

公共(public)无效编译(字符串文件) {

        file = file.ToLower();

Process prCompile = new Process();
Dir = Directory.CreateDirectory(c.WorkingDir + random.Next() + "\\");

try
{
// First clean up the folder
CleanUpFolder(true, file);

// First set lock and copy all sources
Monitor.Enter(lockingObj);
if (filesToCopy == null)
{
CopySource(Dir.FullName);
}
Monitor.Exit(lockingObj);

FileInfo batch = new FileInfo(@"batches\compile.cmd");
if (!batch.Exists)
{
Assembly _assembly = Assembly.GetExecutingAssembly();
StreamReader _textStreamReader = new StreamReader(_assembly.GetManifestResourceStream(@"Batches\compile.cmd"));
_textStreamReader.Dispose();
}

prCompile.StartInfo.FileName = @"cmd.exe";
prCompile.StartInfo.Arguments = @"/c " + batch.FullName + " " + c.Version.Replace(".", "") + " " + file.Remove(file.LastIndexOf('.')) + " " + Dir.FullName + " " + Dir.FullName.Remove(Dir.FullName.IndexOf(@"\"));
prCompile.StartInfo.CreateNoWindow = true;
prCompile.StartInfo.UseShellExecute = false;
prCompile.StartInfo.RedirectStandardOutput = true;
prCompile.StartInfo.RedirectStandardError = true;
prCompile.StartInfo.WorkingDirectory = Assembly.GetExecutingAssembly().Location.Remove(Assembly.GetExecutingAssembly().Location.LastIndexOf("\\") + 1);
prCompile.EnableRaisingEvents = true;
prCompile.OutputDataReceived += prCompile_OutputDataReceived;
prCompile.ErrorDataReceived += prCompile_OutputDataReceived;
prCompile.Start();
prCompile.BeginErrorReadLine();
prCompile.BeginOutputReadLine();
prCompile.WaitForExit();
prCompile.Close();
prCompile.Dispose();

CleanUpFolder(false, file);

if (File.Exists(Config.Instance.WorkingDir + file.Remove(file.LastIndexOf('.')) + ".dll") || File.Exists(Config.Instance.WorkingDir + file.Remove(file.LastIndexOf('.')) + ".exe"))
{
dllExisting = true;
linkFailure = false;
}
else
{
if (File.Exists(Config.Instance.WorkingDir + file.Remove(file.LastIndexOf('.')) + ".obj"))
{
linkFailure = true;
}
dllExisting = false;
}



}
catch (ThreadAbortException)
{
if (prCompile != null)
{
// On Error kill process
prCompile.Kill();
prCompile.Dispose();
}
}
catch (Win32Exception)
{
}
catch (Exception)
{
dllExisting = false;
}

while (true)
{
try
{
if (Directory.Exists(Dir.FullName))
{
Directory.Delete(Dir.FullName, true);
break;
}
else
{
break;
}
}
catch
{
}
}


}
private void CopySource(string Destination)
{
filesToCopy = new StringCollection();
foreach (string strFile in Directory.GetFiles(c.WorkingDir))
{
string tmpStrFile = strFile.ToLower();

foreach (string Extension in c.Extensions)
{
if (tmpStrFile.Contains(Extension))
{
filesToCopy.Add(tmpStrFile);
}
}
}

if (filesToCopy.Count > 0)
{
foreach (string strFile in filesToCopy)
{
File.Copy(strFile, Destination + strFile.Remove(0, strFile.LastIndexOf("\\")));
}
}
}

private void CleanUpFolder(bool PreCleanup, string Filename)
{
//Copy all files from compilationfolder to working directory
if (!PreCleanup)
{
foreach (string strFile in Directory.GetFiles(Dir.FullName, Filename.Remove(Filename.LastIndexOf(".") + 1) + "*"))
{
FileInfo fileToMove = new FileInfo(strFile);

if (fileToMove.Name.ToLower().Contains(Filename.Remove(Filename.LastIndexOf("."))))
{
File.Copy(strFile, c.WorkingDir + fileToMove.Name, true);
}
}
}

//Delete useless files
foreach (string filename in Directory.GetFiles(Config.Instance.WorkingDir, Filename.Remove(Filename.LastIndexOf("."))+".*"))
{
bool foundExt = c.Extensions.Contains(filename.Remove(0, filename.LastIndexOf(".") + 1));
if (PreCleanup)
{
// Only delete files, which are not won't be compiled
if(!foundExt)
{
File.Delete(filename);
}
}
else
{
if (!Config.Instance.SaveLspFile && filename.Contains(".lsp"))
{
File.Delete(filename);
}

if (!Config.Instance.SaveLstFile && filename.Contains(".lst"))
{
File.Delete(filename);
}
}
}
}

public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

protected virtual void Dispose(bool disposing)
{
if (!disposed)
{
if (disposing)
{
Dir = null;
}
disposed = true;
}
}

~CobolCompiler()
{
Dispose (false);
}

我只是尝试在每次编译过程后休眠两秒钟。但这不会改变任何东西。

在编译过程中,CPU 处于 100%。该应用程序正在收集 270 MB RAM。一开始它只有 35MB。

别害怕,我必须将所有源代码复制到临时文件夹,因为编译器无法在同一工作目录中同时编译多个文件。

编辑:我已经解决了没有线程但仍然有项目的问题。

在 ProcessSingle 中,我添加了我尝试再次编译的项目,当时它没有链接到 dll。

因此,我从 14000 个项目开始,并在处理 Parallel.ForEach 时再次将项目(如果它们链接失败)添加到此 concurrentBag。所以我结束了 14000 次 ForEach 运行,并且有 xxx 模块必须再次编译。 :-(

我没看到。旨在不带 WaitForExit 的情况下运行 prReschk。因为检查 14000 多个项目的资源需要很长时间,不应妨碍新的编译。

但是ConcurrentBag尾部线程少的问题依然存在:(不过只是在循环量大的时候才注意到。

最佳答案

Parallel.ForEach 方法将使用 .Net ThreadPool 来分配线程。并行运行的实际线程数将由线程池根据系统 CPU 的负载进行管理。因此,您可能已经指定了 MaxDegreeOfParallelism 但这只是最大值,ThreadPool 可能会决定分配比该最大值更少的线程。

根据您在问题中提供的证据,在我看来编译过程正在耗尽系统资源而不是事后清理。这可以解释为什么 140 次编译最终导致分配的线程数量逐渐减少 - ThreadPool 没有分配新线程,因为它认为 CPU 负载过重。

我会更仔细地研究编译过程是如何终止的。 ProcessSingle 方法是否在编译完全完成之前返回?编译过程中是否存在内存泄漏?

作为实验,我很想知道如果您在调用 ProcessSingle 后添加以下行,它的行为是否会有所不同:

 System.Threading.Thread.Sleep(2000);

这将暂停线程两秒钟,然后再将控制权交还给线程池以分配下一个任务。如果它改善了您的应用程序的行为,那么它强烈表明我的理论是正确的。

关于c# - Parallel.ForEach 最后没有线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10583901/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com