gpt4 book ai didi

c# - 并行处理许多 HTTP Web 请求的好方法是什么?

转载 作者:太空宇宙 更新时间:2023-11-03 11:14:42 29 4
gpt4 key购买 nike

我正在构建一个通用的 URI 检索系统。本质上有一个通用类 Retriever<T>它维护一个要检索的 URI 队列。它有一个单独的线程,可以尽可能快地处理该队列。如问题标题中所示,URI 类型的一个示例是 HTTP 类型 URI。

问题是,当我开始通过抽象方法请求检索资源时 T RetrieveResource(Uri location) ,由于缺乏异步性,它变慢了。

更改 RetrieveResource 的返回类型至 Task<T>是我的第一个想法。然而,当我们有数以千计的未完成任务时,这似乎会使任务堆积起来并导致很多问题。它似乎创建了许多实际线程而不是利用线程池。我想这只会减慢一切,因为同时发生的事情太多了,所以没有任何事情单独取得重大进展。

预计我们将有大量排队的项目要检索,并且无法像排队一样快速处理它们。随着时间的推移,系统有机会 catch ;但绝对不会很快。

我还考虑过不维护一个队列和一个线程来处理它...只是在 ThreadPool 上排队一个工作项.但是,如果说我需要在处理所有工作项之前关闭系统或稍后想要允许优先级排序或其他事情,我不确定这是否理想。

我们还知道,检索资源是一个耗时的过程(0.250 - 5 秒),但不一定是资源密集型过程。我们可以将其并行处理成数百个请求。

我们的要求是:

  • URI 可以从任何线程入队,即使系统正在处理队列也是如此
  • 以后可能需要对检索进行优先排序
  • 应该可以暂停检索
  • 当没有检索到任何内容时,应进行最少的旋转(BlockingCollection 在这里很有用)。

有没有一种好的方法可以在不引入不必要的复杂性的情况下将其并行化?

下面是我们现有的一些代码,作为示例。

public abstract class Retriever<T> : IRetriever<T>, IDisposable
{
private readonly Thread worker;
private readonly BlockingCollection<Uri> pending;
private volatile int isStarted;
private volatile int isDisposing;

public event EventHandler<RetrievalEventArgs<T>> Retrieved;

protected Retriever()
{
this.worker = new Thread(this.RetrieveResources);
this.pending = new BlockingCollection<Uri>(new ConcurrentQueue<Uri>());
this.isStarted = 0;
this.isDisposing = 0;
}

~Retriever()
{
this.Dispose(false);
}

private void RetrieveResources()
{
while (this.isDisposing == 0)
{
while (this.isStarted == 0)
{
Monitor.Wait(this.pending);
}

Uri location = this.pending.Take();

// This is what needs to be concurrently done.
// In this example, it's synchronous, but just on a separate thread.
T result = this.RetrieveResource(location);

// At this point, we would fire our event with the retrieved data
}
}

protected abstract T RetrieveResource(Uri location);

protected void Dispose(bool disposing)
{
if (Interlocked.CompareExchange(ref this.isDisposing, 1, 0) == 1)
{
return;
}

if (disposing)
{
this.pending.CompleteAdding();
this.worker.Join();
}
}

public void Add(Uri uri)
{
try
{
this.pending.Add(uri);
}
catch (InvalidOperationException)
{
return;
}
}

public void AddRange(IEnumerable<Uri> uris)
{
foreach (Uri uri in uris)
{
try
{
this.pending.Add(uri);
}
catch (InvalidOperationException)
{
return;
}
}
}

public void Start()
{
if (Interlocked.CompareExchange(ref this.isStarted, 1, 0) == 1)
{
throw new InvalidOperationException("The retriever is already started.");
}

if (this.worker.ThreadState == ThreadState.Unstarted)
{
this.worker.Start();
}

Monitor.Pulse(this.pending);
}

public void Stop()
{
if (Interlocked.CompareExchange(ref this.isStarted, 0, 1) == 0)
{
throw new InvalidOperationException("The retriever is already stopped.");
}
}

public void Dispose()
{
this.Dispose(true);
GC.SuppressFinalize(this);
}
}

要在上面的例子的基础上构建...我认为对此的解决方案增加了太多的复杂性或者更确切地说,奇怪的代码...就是这样。

    private void RetrieveResources()
{
while (this.isDisposing == 0)
{
while (this.isStarted == 0)
{
Monitor.Wait(this.pending);
}

Uri location = this.pending.Take();

Task<T> task = new Task<T>((state) =>
{
return this.RetrieveResource(state as Uri);
}, location);

task.ContinueWith((t) =>
{
T result = t.Result;
RetrievalEventArgs<T> args = new RetrievalEventArgs<T>(location, result);

EventHandler<RetrievalEventArgs<T>> callback = this.Retrieved;
if (!Object.ReferenceEquals(callback, null))
{
callback(this, args);
}
});

task.Start();
}
}

最佳答案

我想出了一个我认为非常好的解决方案。我抽象了检索资源的方法和结果的表示。这允许支持检索具有任意结果的任意 URI;有点像一些 URI 驱动的“ORM”。

它支持可变的并发级别。前几天,当我发布这个问题时,我忘记了异步和并发是完全不同的,我用任务实现的只是异步和干扰任务调度程序,因为我真正想要的是并发。

我添加了取消,因为它似乎具有启动/停止功能是个好主意。

public abstract class Retriever<T> : IRetriever<T>
{
private readonly object locker;
private readonly BlockingCollection<Uri> pending;
private readonly Thread[] threads;
private CancellationTokenSource cancellation;

private volatile int isStarted;
private volatile int isDisposing;

public event EventHandler<RetrieverEventArgs<T>> Retrieved;

protected Retriever(int concurrency)
{
if (concurrency <= 0)
{
throw new ArgumentOutOfRangeException("concurrency", "The specified concurrency level must be greater than zero.");
}

this.locker = new object();
this.pending = new BlockingCollection<Uri>(new ConcurrentQueue<Uri>());
this.threads = new Thread[concurrency];
this.cancellation = new CancellationTokenSource();

this.isStarted = 0;
this.isDisposing = 0;

this.InitializeThreads();
}

~Retriever()
{
this.Dispose(false);
}

private void InitializeThreads()
{
for (int i = 0; i < this.threads.Length; i++)
{
Thread thread = new Thread(this.ProcessQueue)
{
IsBackground = true
};

this.threads[i] = thread;
}
}

private void StartThreads()
{
foreach (Thread thread in this.threads)
{
if (thread.ThreadState == ThreadState.Unstarted)
{
thread.Start();
}
}
}

private void CancelOperations(bool reset)
{
this.cancellation.Cancel();
this.cancellation.Dispose();

if (reset)
{
this.cancellation = new CancellationTokenSource();
}
}

private void WaitForThreadsToExit()
{
foreach (Thread thread in this.threads)
{
thread.Join();
}
}

private void ProcessQueue()
{
while (this.isDisposing == 0)
{
while (this.isStarted == 0)
{
Monitor.Wait(this.locker);
}

Uri location;

try
{
location = this.pending.Take(this.cancellation.Token);
}
catch (OperationCanceledException)
{
continue;
}

T data;

try
{
data = this.Retrieve(location, this.cancellation.Token);
}
catch (OperationCanceledException)
{
continue;
}

RetrieverEventArgs<T> args = new RetrieverEventArgs<T>(location, data);

EventHandler<RetrieverEventArgs<T>> callback = this.Retrieved;
if (!Object.ReferenceEquals(callback, null))
{
callback(this, args);
}
}
}

private void ThowIfDisposed()
{
if (this.isDisposing == 1)
{
throw new ObjectDisposedException("Retriever");
}
}

protected abstract T Retrieve(Uri location, CancellationToken token);

protected virtual void Dispose(bool disposing)
{
if (Interlocked.CompareExchange(ref this.isDisposing, 1, 0) == 1)
{
return;
}

if (disposing)
{
this.CancelOperations(false);
this.WaitForThreadsToExit();
this.pending.Dispose();
}
}

public void Start()
{
this.ThowIfDisposed();

if (Interlocked.CompareExchange(ref this.isStarted, 1, 0) == 1)
{
throw new InvalidOperationException("The retriever is already started.");
}

Monitor.PulseAll(this.locker);
this.StartThreads();
}

public void Add(Uri location)
{
this.pending.Add(location);
}

public void Stop()
{
this.ThowIfDisposed();

if (Interlocked.CompareExchange(ref this.isStarted, 0, 1) == 0)
{
throw new InvalidOperationException("The retriever is already stopped.");
}

this.CancelOperations(true);
}

public void Dispose()
{
this.Dispose(true);
GC.SuppressFinalize(this);
}
}

关于c# - 并行处理许多 HTTP Web 请求的好方法是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13026744/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com