- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我正在构建一个通用的 URI 检索系统。本质上有一个通用类 Retriever<T>
它维护一个要检索的 URI 队列。它有一个单独的线程,可以尽可能快地处理该队列。如问题标题中所示,URI 类型的一个示例是 HTTP 类型 URI。
问题是,当我开始通过抽象方法请求检索资源时 T RetrieveResource(Uri location)
,由于缺乏异步性,它变慢了。
更改 RetrieveResource
的返回类型至 Task<T>
是我的第一个想法。然而,当我们有数以千计的未完成任务时,这似乎会使任务堆积起来并导致很多问题。它似乎创建了许多实际线程而不是利用线程池。我想这只会减慢一切,因为同时发生的事情太多了,所以没有任何事情单独取得重大进展。
预计我们将有大量排队的项目要检索,并且无法像排队一样快速处理它们。随着时间的推移,系统有机会 catch ;但绝对不会很快。
我还考虑过不维护一个队列和一个线程来处理它...只是在 ThreadPool
上排队一个工作项.但是,如果说我需要在处理所有工作项之前关闭系统或稍后想要允许优先级排序或其他事情,我不确定这是否理想。
我们还知道,检索资源是一个耗时的过程(0.250 - 5 秒),但不一定是资源密集型过程。我们可以将其并行处理成数百个请求。
我们的要求是:
BlockingCollection
在这里很有用)。有没有一种好的方法可以在不引入不必要的复杂性的情况下将其并行化?
下面是我们现有的一些代码,作为示例。
public abstract class Retriever<T> : IRetriever<T>, IDisposable
{
private readonly Thread worker;
private readonly BlockingCollection<Uri> pending;
private volatile int isStarted;
private volatile int isDisposing;
public event EventHandler<RetrievalEventArgs<T>> Retrieved;
protected Retriever()
{
this.worker = new Thread(this.RetrieveResources);
this.pending = new BlockingCollection<Uri>(new ConcurrentQueue<Uri>());
this.isStarted = 0;
this.isDisposing = 0;
}
~Retriever()
{
this.Dispose(false);
}
private void RetrieveResources()
{
while (this.isDisposing == 0)
{
while (this.isStarted == 0)
{
Monitor.Wait(this.pending);
}
Uri location = this.pending.Take();
// This is what needs to be concurrently done.
// In this example, it's synchronous, but just on a separate thread.
T result = this.RetrieveResource(location);
// At this point, we would fire our event with the retrieved data
}
}
protected abstract T RetrieveResource(Uri location);
protected void Dispose(bool disposing)
{
if (Interlocked.CompareExchange(ref this.isDisposing, 1, 0) == 1)
{
return;
}
if (disposing)
{
this.pending.CompleteAdding();
this.worker.Join();
}
}
public void Add(Uri uri)
{
try
{
this.pending.Add(uri);
}
catch (InvalidOperationException)
{
return;
}
}
public void AddRange(IEnumerable<Uri> uris)
{
foreach (Uri uri in uris)
{
try
{
this.pending.Add(uri);
}
catch (InvalidOperationException)
{
return;
}
}
}
public void Start()
{
if (Interlocked.CompareExchange(ref this.isStarted, 1, 0) == 1)
{
throw new InvalidOperationException("The retriever is already started.");
}
if (this.worker.ThreadState == ThreadState.Unstarted)
{
this.worker.Start();
}
Monitor.Pulse(this.pending);
}
public void Stop()
{
if (Interlocked.CompareExchange(ref this.isStarted, 0, 1) == 0)
{
throw new InvalidOperationException("The retriever is already stopped.");
}
}
public void Dispose()
{
this.Dispose(true);
GC.SuppressFinalize(this);
}
}
要在上面的例子的基础上构建...我认为对此的解决方案增加了太多的复杂性或者更确切地说,奇怪的代码...就是这样。
private void RetrieveResources()
{
while (this.isDisposing == 0)
{
while (this.isStarted == 0)
{
Monitor.Wait(this.pending);
}
Uri location = this.pending.Take();
Task<T> task = new Task<T>((state) =>
{
return this.RetrieveResource(state as Uri);
}, location);
task.ContinueWith((t) =>
{
T result = t.Result;
RetrievalEventArgs<T> args = new RetrievalEventArgs<T>(location, result);
EventHandler<RetrievalEventArgs<T>> callback = this.Retrieved;
if (!Object.ReferenceEquals(callback, null))
{
callback(this, args);
}
});
task.Start();
}
}
最佳答案
我想出了一个我认为非常好的解决方案。我抽象了检索资源的方法和结果的表示。这允许支持检索具有任意结果的任意 URI;有点像一些 URI 驱动的“ORM”。
它支持可变的并发级别。前几天,当我发布这个问题时,我忘记了异步和并发是完全不同的,我用任务实现的只是异步和干扰任务调度程序,因为我真正想要的是并发。
我添加了取消,因为它似乎具有启动/停止功能是个好主意。
public abstract class Retriever<T> : IRetriever<T>
{
private readonly object locker;
private readonly BlockingCollection<Uri> pending;
private readonly Thread[] threads;
private CancellationTokenSource cancellation;
private volatile int isStarted;
private volatile int isDisposing;
public event EventHandler<RetrieverEventArgs<T>> Retrieved;
protected Retriever(int concurrency)
{
if (concurrency <= 0)
{
throw new ArgumentOutOfRangeException("concurrency", "The specified concurrency level must be greater than zero.");
}
this.locker = new object();
this.pending = new BlockingCollection<Uri>(new ConcurrentQueue<Uri>());
this.threads = new Thread[concurrency];
this.cancellation = new CancellationTokenSource();
this.isStarted = 0;
this.isDisposing = 0;
this.InitializeThreads();
}
~Retriever()
{
this.Dispose(false);
}
private void InitializeThreads()
{
for (int i = 0; i < this.threads.Length; i++)
{
Thread thread = new Thread(this.ProcessQueue)
{
IsBackground = true
};
this.threads[i] = thread;
}
}
private void StartThreads()
{
foreach (Thread thread in this.threads)
{
if (thread.ThreadState == ThreadState.Unstarted)
{
thread.Start();
}
}
}
private void CancelOperations(bool reset)
{
this.cancellation.Cancel();
this.cancellation.Dispose();
if (reset)
{
this.cancellation = new CancellationTokenSource();
}
}
private void WaitForThreadsToExit()
{
foreach (Thread thread in this.threads)
{
thread.Join();
}
}
private void ProcessQueue()
{
while (this.isDisposing == 0)
{
while (this.isStarted == 0)
{
Monitor.Wait(this.locker);
}
Uri location;
try
{
location = this.pending.Take(this.cancellation.Token);
}
catch (OperationCanceledException)
{
continue;
}
T data;
try
{
data = this.Retrieve(location, this.cancellation.Token);
}
catch (OperationCanceledException)
{
continue;
}
RetrieverEventArgs<T> args = new RetrieverEventArgs<T>(location, data);
EventHandler<RetrieverEventArgs<T>> callback = this.Retrieved;
if (!Object.ReferenceEquals(callback, null))
{
callback(this, args);
}
}
}
private void ThowIfDisposed()
{
if (this.isDisposing == 1)
{
throw new ObjectDisposedException("Retriever");
}
}
protected abstract T Retrieve(Uri location, CancellationToken token);
protected virtual void Dispose(bool disposing)
{
if (Interlocked.CompareExchange(ref this.isDisposing, 1, 0) == 1)
{
return;
}
if (disposing)
{
this.CancelOperations(false);
this.WaitForThreadsToExit();
this.pending.Dispose();
}
}
public void Start()
{
this.ThowIfDisposed();
if (Interlocked.CompareExchange(ref this.isStarted, 1, 0) == 1)
{
throw new InvalidOperationException("The retriever is already started.");
}
Monitor.PulseAll(this.locker);
this.StartThreads();
}
public void Add(Uri location)
{
this.pending.Add(location);
}
public void Stop()
{
this.ThowIfDisposed();
if (Interlocked.CompareExchange(ref this.isStarted, 0, 1) == 0)
{
throw new InvalidOperationException("The retriever is already stopped.");
}
this.CancelOperations(true);
}
public void Dispose()
{
this.Dispose(true);
GC.SuppressFinalize(this);
}
}
关于c# - 并行处理许多 HTTP Web 请求的好方法是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13026744/
对于 Metal ,如果对主纹理进行 mipmap 处理,是否还需要对多采样纹理进行 mipmap 处理?我阅读了苹果文档,但没有得到任何相关信息。 最佳答案 Mipmapping 适用于您将从中
我正在使用的代码在后端 Groovy 代码中具有呈现 GSP(Groovy 服务器页面)的 Controller 。对于前端,我们使用 React-router v4 来处理路由。我遇到的问题是,通过
我们正在 build 一个巨大的网站。我们正在考虑是在服务器端(ASP .Net)还是在客户端进行 HTML 处理。 例如,我们有 HTML 文件,其作用类似于用于生成选项卡的模板。服务器端获取 HT
我正在尝试将图像加载到 void setup() 中的数组中,但是当我这样做时出现此错误:“类型不匹配,'processing .core.PImage' does not匹配“processing.
我正在尝试使用其私有(private)应用程序更新 Shopify 上的客户标签。我用 postman 尝试过,一切正常,但通过 AJAX,它带我成功回调而不是错误,但成功后我得到了身份验证链接,而不
如何更改我的 Processing appIconTest.exe 导出的默认图标在窗口中的应用程序? 默认一个: 最佳答案 经过一些研究,我能找到的最简单的解决方案是: 进入 ...\process
我在 Processing 中做了一个简单的小游戏,但需要一些帮助。我有一个 mp3,想将它添加到我的应用程序中,以便在后台循环运行。 这可能吗?非常感谢。 最佳答案 您可以使用声音库。处理已经自带
我有几个这样创建的按钮: 在 setup() PImage[] imgs1 = {loadImage("AREA1_1.png"),loadImage("AREA1_2.png"),loadImage
我正在尝试使用 Processing 创建一个多人游戏,但无法弄清楚如何将屏幕分成两个以显示玩家的不同情况? 就像在 c# 中一样,我们有Viewport leftViewport,rightView
我一直在尝试使用 Moore 邻域在处理过程中创建元胞自动机,到目前为止非常成功。我已经设法使基本系统正常工作,现在我希望通过添加不同的功能来使用它。现在,我检查细胞是否存活。如果是,我使用 fill
有没有办法用 JavaScript 代码检查资源使用情况?我可以检查脚本的 RAM 使用情况和 CPU 使用情况吗? 由于做某事有多种方法,我可能会使用不同的方法编写代码,并将其保存为两个不同的文件,
我想弄清楚如何处理这样的列表: [ [[4,6,7], [1,2,4,6]] , [[10,4,2,4], [1]] ] 这是一个整数列表的列表 我希望我的函数将此列表作为输入并返回列表中没有重复的整
有没有办法在不需要时处理 MethodChannel/EventChannel ?我问是因为我想为对象创建多个方法/事件 channel 。 例子: class Call { ... fields
我有一个关于在 Python3 中处理 ConnectionResetError 的问题。这通常发生在我使用 urllib.request.Request 函数时。我想知道如果我们遇到这样的错误是否可
我一直在努力解决这个问题几个小时,但无济于事。代码很简单,一个弹跳球(粒子)。将粒子的速度初始化为 (0, 0) 将使其保持上下弹跳。将粒子的初始化速度更改为 (0, 0.01) 或任何十进制浮点数都
我把自己弄得一团糟。 我想在我的系统中添加 python3.6 所以我决定在我的 Ubuntu 19.10 中卸载现有的。但是现在每次我想安装一些东西我都会得到这样的错误: dpkg: error w
我正在努力解决 Rpart 包中的 NA 功能。我得到了以下数据框(下面的代码) Outcome VarA VarB 1 1 1 0 2 1 1 1
我将 Java 与 JSF 一起使用,这是 Glassfish 3 容器。 在我的 Web 应用程序中,我试图实现一个文件(图像)管理系统。 我有一个 config.properties我从中读取上传
所以我一直在Processing工作几个星期以来,虽然我没有编程经验,但我已经转向更复杂的项目。我正在编写一个进化模拟器,它会产生具有随机属性的生物。 最终,我将添加复制,但现在这些生物只是在屏幕上漂
有人知道 Delphi 2009 对“with”的处理有什么不同吗? 我昨天解决了一个问题,只是将“with”解构为完整引用,如“with Datamodule、Dataset、MainForm”。
我是一名优秀的程序员,十分优秀!