gpt4 book ai didi

c# - 查询队列和线程安全

转载 作者:行者123 更新时间:2023-11-30 12:13:20 25 4
gpt4 key购买 nike

线程安全不是我担心的方面,因为我编写的简单应用程序和库通常只在主线程上运行,或者不直接修改我需要担心的任何类中的属性或字段之前。

但是,我已经开始进行个人项目,我正在使用 WebClient 从远程服务器异步下载数据。有一个 Queue<Uri>,其中包含一系列用于下载数据的 URI 的预构建队列。

因此请考虑以下代码片段(这不是我的真实代码,但我希望能说明我的问题:

private WebClient webClient = new WebClient();
private Queue<Uri> requestQueue = new Queue<Uri>();

public Boolean DownloadNextASync()
{
if (webClient.IsBusy)
return false;

if (requestQueue.Count == 0)
return false

var uri = requestQueue.Dequeue();

webClient.DownloadDataASync(uri);

return true;

}

如果我理解正确,这个方法不是线程安全的(假设这个对象的这个特定实例为多个线程所知)。我的推理是 webClient 可能会在 IsBusy 检查和 DownloadDataASync() 方法调用之间的时间内变得繁忙。而且,在 requestQueue 检查和下一个项目出列之间,Count 可能会变空。

我的问题是处理这种情况以使其线程安全的最佳方法是什么?

这更像是一个抽象的问题,因为我意识到对于这个特定的方法,必须有一个非常不方便的时间才能真正导致问题,并且为了涵盖这种情况,我可以将方法包装在适当的 try-catch 中因为这两部分都会抛出异常。但是还有另一种选择吗? lock 语句是否适用于此?

最佳答案

如果您的目标是 .Net 4.0,您可以使用任务并行库寻求帮助:

var queue = new BlockingCollection<Uri>();
var maxClients = 4;

// Optionally provide another producer/consumer collection for the data
// var data = new BlockingCollection<Tuple<Uri,byte[]>>();

// Optionally implement CancellationTokenSource

var clients = from id in Enumerable.Range(0, maxClients)
select Task.Factory.StartNew(
() =>
{
var client = new WebClient();
while (!queue.IsCompleted)
{
Uri uri;
if (queue.TryTake(out uri))
{
byte[] datum = client.DownloadData(uri); // already "async"
// Optionally pass datum along to the other collection
// or work on it here
}
else Thread.SpinWait(100);
}
});

// Add URI's to search
// queue.Add(...);

// Notify our clients that we've added all the URI's
queue.CompleteAdding();

// Wait for all of our clients to finish
clients.WaitAll();

要将此方法用于进度指示,您可以使用 TaskCompletionSource<TResult>管理基于事件的并行性:

public static Task<byte[]> DownloadAsync(Uri uri, Action<double> progress)
{
var source = new TaskCompletionSource<byte[]>();
Task.Factory.StartNew(
() =>
{
var client = new WebClient();
client.DownloadProgressChanged
+= (sender, e) => progress(e.ProgressPercentage);
client.DownloadDataCompleted
+= (sender, e) =>
{
if (!e.Cancelled)
{
if (e.Error == null)
{
source.SetResult((byte[])e.Result);
}
else
{
source.SetException(e.Error);
}
}
else
{
source.SetCanceled();
}
};
});

return source.Task;
}

这样使用:

// var urls = new List<Uri>(...);
// var progressBar = new ProgressBar();

Task.Factory.StartNew(
() =>
{
foreach (var uri in urls)
{
var task = DownloadAsync(
uri,
p =>
progressBar.Invoke(
new MethodInvoker(
delegate { progressBar.Value = (int)(100 * p); }))
);

// Will Block!
// data = task.Result;
}
});

关于c# - 查询队列和线程安全,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11851147/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com