gpt4 book ai didi

c# - 如何使用 C# 任务并行库和 IProducerConsumerCollection 实现通用回调?

转载 作者:太空宇宙 更新时间:2023-11-03 11:07:03 24 4
gpt4 key购买 nike

我有一个组件可以向基于网络的 API 提交请求,但必须限制这些请求以免违反 API 的数据限制。这意味着所有请求都必须通过队列以控制它们的提交速率,但它们可以(并且应该)并发执行以实现最大吞吐量。每个请求必须在将来某个时间完成时将一些数据返回给调用代码。

我正在努力创建一个很好的模型来处理数据返回。

使用 BlockingCollection我不能只返回 Task<TResult>来自 Schedule方法,因为入队和出队过程在缓冲区的两端。所以我创建了一个 RequestItem<TResult>包含 Action<Task<TResult>> 形式的回调的类型.

想法是,一旦从队列中拉出一个项目,就可以使用启动的任务调用回调,但是到那时我已经丢失了通用类型参数,只能使用反射和各种肮脏的方法(如果可能的话)。

例如:

public class RequestScheduler 
{
private readonly BlockingCollection<IRequestItem> _queue = new BlockingCollection<IRequestItem>();

public RequestScheduler()
{
this.Start();
}

// This can't return Task<TResult>, so returns void.
// Instead RequestItem is generic but this poses problems when adding to the queue
public void Schedule<TResult>(RequestItem<TResult> request)
{
_queue.Add(request);
}

private void Start()
{
Task.Factory.StartNew(() =>
{
foreach (var item in _queue.GetConsumingEnumerable())
{
// I want to be able to use the original type parameters here
// is there a nice way without reflection?
// ProcessItem submits an HttpWebRequest
Task.Factory.StartNew(() => ProcessItem(item))
.ContinueWith(t => { item.Callback(t); });
}
});
}

public void Stop()
{
_queue.CompleteAdding();
}
}

public class RequestItem<TResult> : IRequestItem
{
public IOperation<TResult> Operation { get; set; }
public Action<Task<TResult>> Callback { get; set; }
}

如何继续缓冲我的请求但返回 Task<TResult>当请求从缓冲区中提取并提交给 API 时发送给客户端?

最佳答案

首先,您可以返回Task<TResult>来自 Schedule() , 你只需要使用 TaskCompletionSource 为此。

其次,为了解决通用性问题,您可以将所有内容隐藏在内部(非通用)Action秒。在 Schedule() ,使用完全满足您需要的 lambda 创建操作。消费循环随后将执行该操作,它不需要知道里面是什么。

第三,我不明白你为什么要开始一个新的Task在循环的每次迭代中。首先,这意味着您实际上不会受到任何限制。

经过这些修改,代码可能如下所示:

public class RequestScheduler
{
private readonly BlockingCollection<Action> m_queue = new BlockingCollection<Action>();

public RequestScheduler()
{
this.Start();
}

private void Start()
{
Task.Factory.StartNew(() =>
{
foreach (var action in m_queue.GetConsumingEnumerable())
{
action();
}
}, TaskCreationOptions.LongRunning);
}

public Task<TResult> Schedule<TResult>(IOperation<TResult> operation)
{
var tcs = new TaskCompletionSource<TResult>();

Action action = () =>
{
try
{
tcs.SetResult(ProcessItem(operation));
}
catch (Exception e)
{
tcs.SetException(e);
}
};

m_queue.Add(action);

return tcs.Task;
}

private T ProcessItem<T>(IOperation<T> operation)
{
// whatever
}
}

关于c# - 如何使用 C# 任务并行库和 IProducerConsumerCollection 实现通用回调?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15506718/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com