- r - 以节省内存的方式增长 data.frame
- ruby-on-rails - ruby/ruby on rails 内存泄漏检测
- android - 无法解析导入android.support.v7.app
- UNIX 域套接字与共享内存(映射文件)
我必须编写一个调用 Microsoft Dynamics CRM Web 服务的控制台应用程序来对八千多个 CRM 对象执行操作。 Web 服务调用的细节无关紧要,此处未显示,但我需要一个多线程客户端,以便我可以并行调用。我希望能够控制配置设置中使用的线程数量,并且如果服务错误数量达到配置定义的阈值,应用程序也可以取消整个操作。
我使用任务并行库 Task.Run 和 ContinueWith 编写它,跟踪正在进行的调用(线程)数量、我们收到的错误数量以及用户是否已从键盘取消。一切正常,我进行了大量的日志记录,以确保线程干净利落地完成,并且在运行结束时一切都井井有条。我可以看到该程序正在使用最大并行线程数,如果达到最大限制,则等待一个正在运行的任务完成,然后再开始另一个任务。
在我的代码审查期间,我的同事建议最好使用 async/await 而不是 tasks 和 continuations,所以我创建了一个分支并以这种方式重写了它。结果很有趣——async/await 版本的速度几乎是原来的两倍,而且它从未达到允许的最大并行操作/线程数。 TPL 始终达到 10 个并行线程,而异步/等待版本从未超过 5 个。
我的问题是:我编写异步/等待代码(甚至 TPL 代码)的方式是否有误?如果我没有编码错误,您能否解释为什么 async/await 效率较低,这是否意味着继续使用 TPL 进行多线程代码会更好。
请注意,我测试的代码实际上并未调用 CRM - CrmClient 类只是线程休眠了配置中指定的持续时间(五秒),然后抛出异常。这意味着没有可能影响性能的外部变量。
为了这个问题的目的,我创建了一个结合了两个版本的精简程序;调用哪一个由配置设置决定。它们中的每一个都从设置环境的 Bootstrap 运行器开始,创建队列类,然后使用 TaskCompletionSource 等待完成。 CancellationTokenSource 用于向用户发出取消信号。要处理的 ID 列表从嵌入式文件中读取并推送到 ConcurrentQueue。他们都开始调用 StartCrmRequest 的次数与 max-threads 一样;随后,每次处理结果时,ProcessResult 方法都会再次调用 StartCrmRequest,一直持续到处理完我们所有的 ID。
您可以从这里克隆/下载完整的程序:https://bitbucket.org/kentrob/pmgfixso/
相关配置如下:
<appSettings>
<add key="TellUserAfterNCalls" value="5"/>
<add key="CrmErrorsBeforeQuitting" value="20"/>
<add key="MaxThreads" value="10"/>
<add key="CallIntervalMsecs" value="5000"/>
<add key="UseAsyncAwait" value="True" />
</appSettings>
从 TPL 版本开始,这里是启动队列管理器的引导运行程序:
public static class TplRunner
{
private static readonly CancellationTokenSource CancellationTokenSource = new CancellationTokenSource();
public static void StartQueue(RuntimeParameters parameters, IEnumerable<string> idList)
{
Console.CancelKeyPress += (s, args) =>
{
CancelCrmClient();
args.Cancel = true;
};
var start = DateTime.Now;
Program.TellUser("Start: " + start);
var taskCompletionSource = new TplQueue(parameters)
.Start(CancellationTokenSource.Token, idList);
while (!taskCompletionSource.Task.IsCompleted)
{
if (Console.KeyAvailable)
{
if (Console.ReadKey().Key != ConsoleKey.Q) continue;
Console.WriteLine("When all threads are complete, press any key to continue.");
CancelCrmClient();
}
}
var end = DateTime.Now;
Program.TellUser("End: {0}. Elapsed = {1} secs.", end, (end - start).TotalSeconds);
}
private static void CancelCrmClient()
{
CancellationTokenSource.Cancel();
Console.WriteLine("Cancelling Crm client. Web service calls in operation will have to run to completion.");
}
}
这是 TPL 队列管理器本身:
public class TplQueue
{
private readonly RuntimeParameters parameters;
private readonly object locker = new object();
private ConcurrentQueue<string> idQueue = new ConcurrentQueue<string>();
private readonly CrmClient crmClient;
private readonly TaskCompletionSource<bool> taskCompletionSource = new TaskCompletionSource<bool>();
private int threadCount;
private int crmErrorCount;
private int processedCount;
private CancellationToken cancelToken;
public TplQueue(RuntimeParameters parameters)
{
this.parameters = parameters;
crmClient = new CrmClient();
}
public TaskCompletionSource<bool> Start(CancellationToken cancellationToken, IEnumerable<string> ids)
{
cancelToken = cancellationToken;
foreach (var id in ids)
{
idQueue.Enqueue(id);
}
threadCount = 0;
// Prime our thread pump with max threads.
for (var i = 0; i < parameters.MaxThreads; i++)
{
Task.Run((Action) StartCrmRequest, cancellationToken);
}
return taskCompletionSource;
}
private void StartCrmRequest()
{
if (taskCompletionSource.Task.IsCompleted)
{
return;
}
if (cancelToken.IsCancellationRequested)
{
Program.TellUser("Crm client cancelling...");
ClearQueue();
return;
}
var count = GetThreadCount();
if (count >= parameters.MaxThreads)
{
return;
}
string id;
if (!idQueue.TryDequeue(out id)) return;
IncrementThreadCount();
crmClient.CompleteActivityAsync(new Guid(id), parameters.CallIntervalMsecs).ContinueWith(ProcessResult);
processedCount += 1;
if (parameters.TellUserAfterNCalls > 0 && processedCount%parameters.TellUserAfterNCalls == 0)
{
ShowProgress(processedCount);
}
}
private void ProcessResult(Task<CrmResultMessage> response)
{
if (response.Result.CrmResult == CrmResult.Failed && ++crmErrorCount == parameters.CrmErrorsBeforeQuitting)
{
Program.TellUser(
"Quitting because CRM error count is equal to {0}. Already queued web service calls will have to run to completion.",
crmErrorCount);
ClearQueue();
}
var count = DecrementThreadCount();
if (idQueue.Count == 0 && count == 0)
{
taskCompletionSource.SetResult(true);
}
else
{
StartCrmRequest();
}
}
private int GetThreadCount()
{
lock (locker)
{
return threadCount;
}
}
private void IncrementThreadCount()
{
lock (locker)
{
threadCount = threadCount + 1;
}
}
private int DecrementThreadCount()
{
lock (locker)
{
threadCount = threadCount - 1;
return threadCount;
}
}
private void ClearQueue()
{
idQueue = new ConcurrentQueue<string>();
}
private static void ShowProgress(int processedCount)
{
Program.TellUser("{0} activities processed.", processedCount);
}
}
请注意,我知道有几个计数器不是线程安全的,但它们并不重要; threadCount 变量是唯一的关键变量。
这是虚拟 CRM 客户端方法:
public Task<CrmResultMessage> CompleteActivityAsync(Guid activityId, int callIntervalMsecs)
{
// Here we would normally call a CRM web service.
return Task.Run(() =>
{
try
{
if (callIntervalMsecs > 0)
{
Thread.Sleep(callIntervalMsecs);
}
throw new ApplicationException("Crm web service not available at the moment.");
}
catch
{
return new CrmResultMessage(activityId, CrmResult.Failed);
}
});
}
下面是相同的 async/await 类(为了简洁起见删除了常用方法):
public static class AsyncRunner
{
private static readonly CancellationTokenSource CancellationTokenSource = new CancellationTokenSource();
public static void StartQueue(RuntimeParameters parameters, IEnumerable<string> idList)
{
var start = DateTime.Now;
Program.TellUser("Start: " + start);
var taskCompletionSource = new AsyncQueue(parameters)
.StartAsync(CancellationTokenSource.Token, idList).Result;
while (!taskCompletionSource.Task.IsCompleted)
{
...
}
var end = DateTime.Now;
Program.TellUser("End: {0}. Elapsed = {1} secs.", end, (end - start).TotalSeconds);
}
}
异步/等待队列管理器:
public class AsyncQueue
{
private readonly RuntimeParameters parameters;
private readonly object locker = new object();
private readonly CrmClient crmClient;
private readonly TaskCompletionSource<bool> taskCompletionSource = new TaskCompletionSource<bool>();
private CancellationToken cancelToken;
private ConcurrentQueue<string> idQueue = new ConcurrentQueue<string>();
private int threadCount;
private int crmErrorCount;
private int processedCount;
public AsyncQueue(RuntimeParameters parameters)
{
this.parameters = parameters;
crmClient = new CrmClient();
}
public async Task<TaskCompletionSource<bool>> StartAsync(CancellationToken cancellationToken,
IEnumerable<string> ids)
{
cancelToken = cancellationToken;
foreach (var id in ids)
{
idQueue.Enqueue(id);
}
threadCount = 0;
// Prime our thread pump with max threads.
for (var i = 0; i < parameters.MaxThreads; i++)
{
await StartCrmRequest();
}
return taskCompletionSource;
}
private async Task StartCrmRequest()
{
if (taskCompletionSource.Task.IsCompleted)
{
return;
}
if (cancelToken.IsCancellationRequested)
{
...
return;
}
var count = GetThreadCount();
if (count >= parameters.MaxThreads)
{
return;
}
string id;
if (!idQueue.TryDequeue(out id)) return;
IncrementThreadCount();
var crmMessage = await crmClient.CompleteActivityAsync(new Guid(id), parameters.CallIntervalMsecs);
ProcessResult(crmMessage);
processedCount += 1;
if (parameters.TellUserAfterNCalls > 0 && processedCount%parameters.TellUserAfterNCalls == 0)
{
ShowProgress(processedCount);
}
}
private async void ProcessResult(CrmResultMessage response)
{
if (response.CrmResult == CrmResult.Failed && ++crmErrorCount == parameters.CrmErrorsBeforeQuitting)
{
Program.TellUser(
"Quitting because CRM error count is equal to {0}. Already queued web service calls will have to run to completion.",
crmErrorCount);
ClearQueue();
}
var count = DecrementThreadCount();
if (idQueue.Count == 0 && count == 0)
{
taskCompletionSource.SetResult(true);
}
else
{
await StartCrmRequest();
}
}
}
因此,将 MaxThreads 设置为 10 并将 CrmErrorsBeforeQuitting 设置为 20,我机器上的 TPL 版本在 19 秒内完成,而异步/等待版本需要 35 秒。考虑到我有超过 8000 个电话,所以这是一个显着的差异。有什么想法吗?
最佳答案
我想我在这里看到了问题,或者至少是其中的一部分。仔细看下面的两段代码;它们不等价。
// Prime our thread pump with max threads.
for (var i = 0; i < parameters.MaxThreads; i++)
{
Task.Run((Action) StartCrmRequest, cancellationToken);
}
和:
// Prime our thread pump with max threads.
for (var i = 0; i < parameters.MaxThreads; i++)
{
await StartCrmRequest();
}
在原始代码中(我认为它在功能上是合理的)有一个对 ContinueWith
的调用。如果要保留原始行为,那正是我希望在微不足道的重写中看到的 await
语句的数量。
这不是硬性规定,只适用于简单的情况,但仍然是一件值得留意的好事。
关于c# - 为什么这个 TAP 异步/等待代码比 TPL 版本慢?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24389549/
无法使用 Hive 版本 1.1.0 HBase 版本 0.94.8 和 hadoop 版本 2.7.0 从 hive 创建 Hbase 表 hive (default)> CREATE TABLE
我试图为 electron app 创建可执行文件但面临这个问题 Unable to determine Electron version. Please specify an Electron ve
我正在尝试让自适应阈值在 python 绑定(bind)到 opencv 中工作(swig 一个 - 无法让 opencv 2.0 工作,因为我正在使用 beagleboard 因为交叉编译还没有工作
我一直在 linux 机器上使用 JMeter,在命令行下使用了一段时间。工作正常。 今天,我在 Windows 机器(新客户端等)上尝试了它,它确实可以工作,但在控制台窗口中输出有很大不同。 Lin
在我的编码环境中,我通常使用最新版本的 Java 和 Eclipse。当我编写源代码时,我不会注意我使用的 API 方法或类是否向后兼容旧版本的 Java 或 Eclipse。在 javadoc 中存
问题是关于版本的特定组合,但更普遍。 我刚刚从 Kubuntu 12.04 升级到 14.04。现在,当我想编译 CUDA 代码(使用 CUDA 6.5)时,我得到: #error -- unsupp
我目前正在对我的一些应用程序进行沙箱处理,看来我必须删除一些功能才能满足 Mac App Store 沙箱(和其他)规则。 显然用户不会因为失去功能而感到高兴,我担心他们不会指责苹果制定了愚蠢的规则,
我用 flash 和 js 版本创建了一个动画横幅。 是否可以检测低于版本 9 的 ie 版本,然后提供 Flash 横幅,否则提供 js 横幅。 最佳答案 您可以使用条件注释来检测 IE 版本
我有一个处理不同位置的数据库的应用程序,我想检查这些数据库是否使用 Firebird 2.5 或更高版本打开。我们最近从 Firebird 2.0 迁移到了 2.5,我们有很多数据库可以响应 sele
我正在开发一个应用程序,我使用托管在我的服务器上的 Java 和 Jersey 构建了后端部分。我在服务器上使用 Tomcat7 来调用 Web 服务。 我以前有一台安装了 Ubuntu 的计算机,我
我可以使用 GetVersionEx() 函数来获取 Windows 版本,但是这个函数将返回一个数字而不是一个字符串。但是没有问题,因为我可以将数字转换为字符串,例如: if (osvi.dwMaj
我已经在我的系统中安装了 Anaconda 2 & 3。 Anaconda 2 包含 python 2.7 & Anaconda 3 包含 python 3.6。 我需要使用命令提示符运行我的 pyt
我正在尝试构建一个 Android 项目,但发生了以下错误 Error:(10, 1) A problem occurred evaluating project ':app'. > Failed t
关闭。这个问题需要更多focused .它目前不接受答案。 想改进这个问题吗? 更新问题,使其只关注一个问题 editing this post . 关闭 4 年前。 Improve this qu
在降级我的 GCC 之前,我想知道是否有办法确定我的机器中的哪些程序/框架或依赖项会中断,以及是否有更好的方法来执行 openpose 安装? (例如,在 CMake 中更改某些内容) 有没有办法在不
我已经在终端的代码sudo apt-get install Shadowsocks-qt5中安装了Shadowsocks-Qt5,然后我可以通过搜索找到启动图标,但是它当我点击图标时打不开。然后我尝试
在网络上找到的文档说,MLLP V2(第 2 版)是用于传输 HL7 版本 3 内容的所有消息传输协议(protocol)的要求。似乎 MLLP 第 2 版主要用于 HL7 第 3 版。 我们可以/应
我正在使用带有 selinium webdriver 的 Protractor 。我的chromeDriver版本是78.0.1,chrome版本是78.0.3904.97。两个版本都匹配,应该不会有
我正在按照教程设置 mysql 数据库并做一些事情。我无法找到数据库资源管理器。我读了很多,但在 Window->show View-> Dataxxx 或右侧上部选项卡中无法正常工作。 最佳答案 从
我已经在 KDE 桌面上安装了 Anaconda 2.0.1。当我运行 python 并看到所有已安装的模块时,我收到此消息“无法将不兼容的 Qt 库(版本 0x40801)与该库(版本 0x4080
我是一名优秀的程序员,十分优秀!